Skip to content

Commit

Permalink
pref(connector): avoid allocation for metadata column when parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 9e137fc commit 3e01795
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 83 deletions.
63 changes: 32 additions & 31 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{Datum, DatumCow, Scalar, ScalarImpl};
use risingwave_common::types::{Datum, DatumCow, DatumRef, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::tracing::InstrumentStream;
use risingwave_connector_codec::decoder::avro::MapHandling;
Expand Down Expand Up @@ -203,52 +203,51 @@ pub struct MessageMeta<'a> {
offset: &'a str,
}

impl MessageMeta<'_> {
impl<'a> MessageMeta<'a> {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<Datum> {
match desc.column_type {
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<DatumRef<'a>> {
let datum: DatumRef<'_> = match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
SourceColumnType::RowId => Datum::None.into(),
SourceColumnType::RowId => None,
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(self.offset.into()).into(),
SourceColumnType::Offset => Some(self.offset.into()),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
assert_eq!(
desc.name.as_str(),
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected kafka meta column name"
);
kafka_meta
.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into()
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.into()
})
}
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
assert_eq!(
desc.name.as_str(),
TABLE_NAME_COLUMN_NAME,
"unexpected cdc meta column name"
);
Datum::Some(cdc_meta.full_table_name.as_str().into()).into()
Some(cdc_meta.full_table_name.as_str().into())
}

// For other cases, return `None`.
SourceColumnType::Meta | SourceColumnType::Normal => None,
}
SourceColumnType::Meta | SourceColumnType::Normal => return None,
};

Some(datum)
}
}

trait OpAction {
type Output<'a>;

fn output_for<'a>(datum: Datum) -> Self::Output<'a>;
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;

fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);

Expand All @@ -263,7 +262,7 @@ impl OpAction for OpActionInsert {
type Output<'a> = DatumCow<'a>;

#[inline(always)]
fn output_for<'a>(datum: Datum) -> Self::Output<'a> {
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
datum.into()
}

Expand All @@ -289,7 +288,7 @@ impl OpAction for OpActionDelete {
type Output<'a> = DatumCow<'a>;

#[inline(always)]
fn output_for<'a>(datum: Datum) -> Self::Output<'a> {
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
datum.into()
}

Expand All @@ -315,8 +314,9 @@ impl OpAction for OpActionUpdate {
type Output<'a> = (DatumCow<'a>, DatumCow<'a>);

#[inline(always)]
fn output_for<'a>(datum: Datum) -> Self::Output<'a> {
(datum.clone().into(), datum.into())
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
let datum = datum.into();
(datum.clone(), datum)
}

#[inline(always)]
Expand Down Expand Up @@ -345,7 +345,7 @@ impl SourceStreamChunkRowWriter<'_> {
}

fn do_action<'a, A: OpAction>(
&mut self,
&'a mut self,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
) -> AccessResult<()> {
let mut parse_field = |desc: &SourceColumnDesc| {
Expand Down Expand Up @@ -411,10 +411,11 @@ impl SourceStreamChunkRowWriter<'_> {
match self.row_meta {
Some(row_meta) => {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
Ok(A::output_for(
extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())?
.unwrap_or(None),
))
Ok(A::output_for(extract_cdc_meta_column(
cdc_meta,
col,
desc.name.as_str(),
)?))
} else {
Err(AccessError::Uncategorized {
message: "CDC metadata not found in the message".to_string(),
Expand All @@ -439,15 +440,15 @@ impl SourceStreamChunkRowWriter<'_> {
return Ok(A::output_for(
self.row_meta
.as_ref()
.map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())),
.map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
));
}
(_, &Some(AdditionalColumnType::Offset(_))) => {
// the meta info does not involve spec connector
return Ok(A::output_for(
self.row_meta
.as_ref()
.map(|ele| ScalarImpl::Utf8(ele.offset.to_string().into())),
.map(|ele| ScalarRefImpl::Utf8(ele.offset)),
));
}
(_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
Expand All @@ -461,7 +462,7 @@ impl SourceStreamChunkRowWriter<'_> {
header_inner.data_type.as_ref(),
)
})
.unwrap_or(None),
.unwrap_or(Datum::None.into()),
))
}
(_, &Some(AdditionalColumnType::Headers(_))) => {
Expand All @@ -477,7 +478,7 @@ impl SourceStreamChunkRowWriter<'_> {
return Ok(A::output_for(
self.row_meta
.as_ref()
.map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())),
.map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
));
}
(_, _) => {
Expand Down
18 changes: 9 additions & 9 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::Context;
use bytes::Bytes;
use reqwest::Url;
use risingwave_common::bail;
use risingwave_common::types::Datum;
use risingwave_common::types::{Datum, DatumCow, DatumRef};
use risingwave_pb::data::DataType as PbDataType;

use crate::aws_utils::load_file_descriptor_from_s3;
Expand Down Expand Up @@ -132,19 +132,19 @@ pub(super) async fn bytes_from_url(
}
}

pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<Datum> {
pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option<DatumRef<'_>> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(),
SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(),
SourceMeta::DebeziumCdc(cdc_meta) => Some(cdc_meta.extract_timestamp()),
_ => None,
}
}

pub fn extract_cdc_meta_column(
cdc_meta: &DebeziumCdcMeta,
pub fn extract_cdc_meta_column<'a>(
cdc_meta: &'a DebeziumCdcMeta,
column_type: &additional_column::ColumnType,
column_name: &str,
) -> AccessResult<Option<Datum>> {
) -> AccessResult<DatumRef<'a>> {
match column_type {
ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()),
ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()),
Expand All @@ -162,11 +162,11 @@ pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option<Datum> {
}
}

pub fn extract_header_inner_from_meta(
meta: &SourceMeta,
pub fn extract_header_inner_from_meta<'a>(
meta: &'a SourceMeta,
inner_field: &str,
data_type: Option<&PbDataType>,
) -> Option<Datum> {
) -> Option<DatumCow<'a>> {
match meta {
SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), /* expect output of type `bytea` or `varchar` */
_ => None,
Expand Down
27 changes: 11 additions & 16 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Datum, Scalar, ScalarImpl, Timestamptz};
use risingwave_common::types::{DatumRef, ScalarRefImpl, Timestamptz};
use risingwave_pb::connector_service::CdcMessage;

use crate::source::base::SourceMessage;
Expand All @@ -30,27 +30,22 @@ pub struct DebeziumCdcMeta {
}

impl DebeziumCdcMeta {
pub fn extract_timestamp(&self) -> Option<Datum> {
Some(
Timestamptz::from_millis(self.source_ts_ms)
.unwrap()
.to_scalar_value(),
)
.into()
pub fn extract_timestamp(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Timestamptz(
Timestamptz::from_millis(self.source_ts_ms).unwrap(),
))
}

pub fn extract_database_name(&self) -> Option<Datum> {
Some(ScalarImpl::from(
self.full_table_name.as_str()[0..self.db_name_prefix_len].to_string(),
pub fn extract_database_name(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Utf8(
&self.full_table_name.as_str()[0..self.db_name_prefix_len],
))
.into()
}

pub fn extract_table_name(&self) -> Option<Datum> {
Some(ScalarImpl::from(
self.full_table_name.as_str()[self.db_name_prefix_len..].to_string(),
pub fn extract_table_name(&self) -> DatumRef<'_> {
Some(ScalarRefImpl::Utf8(
&self.full_table_name.as_str()[self.db_name_prefix_len..],
))
.into()
}

pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self {
Expand Down
61 changes: 34 additions & 27 deletions src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use itertools::Itertools;
use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders};
use rdkafka::Message;
use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue};
use risingwave_common::types::{
Datum, DatumCow, DatumRef, ListValue, ScalarImpl, ScalarRefImpl, StructValue,
};
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::data::DataType as PbDataType;

Expand All @@ -31,39 +35,42 @@ pub struct KafkaMeta {
}

impl KafkaMeta {
pub fn extract_timestamp(&self) -> Option<Datum> {
self.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into()
pub fn extract_timestamp(&self) -> Option<DatumRef<'_>> {
self.timestamp.map(|ts| {
Some(ScalarRefImpl::Timestamptz(
risingwave_common::cast::i64_to_timestamptz(ts).unwrap(),
))
})
}

pub fn extract_header_inner(
&self,
pub fn extract_header_inner<'a>(
&'a self,
inner_field: &str,
data_type: Option<&PbDataType>,
) -> Option<Datum> {
let target_value = self
.headers
.as_ref()
.iter()
.find_map(|headers| {
headers
.iter()
.find(|header| header.key == inner_field)
.map(|header| header.value)
})
.unwrap_or(None); // if not found the specified column, return None
if let Some(data_type) = data_type
) -> Option<DatumCow<'a>> {
let target_value = self.headers.as_ref().iter().find_map(|headers| {
headers
.iter()
.find(|header| header.key == inner_field)
.map(|header| header.value)
})?; // if not found the specified column, return None

let Some(target_value) = target_value else {
return Some(Datum::None.into());
};

let datum = if let Some(data_type) = data_type
&& data_type.type_name == PbTypeName::Varchar as i32
{
Some(target_value.map(|byte| ScalarImpl::Utf8(String::from_utf8_lossy(byte).into())))
match String::from_utf8_lossy(target_value) {
Cow::Borrowed(str) => Some(ScalarRefImpl::Utf8(str)).into(),
Cow::Owned(string) => Some(ScalarImpl::Utf8(string.into())).into(),
}
} else {
Some(target_value.map(|byte| ScalarImpl::Bytea(byte.into())))
}
Some(ScalarRefImpl::Bytea(target_value)).into()
};

Some(datum)
}

pub fn extract_headers(&self) -> Option<Datum> {
Expand Down

0 comments on commit 3e01795

Please sign in to comment.