Skip to content

Commit

Permalink
refactor meta value method and closure name
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 27, 2023
1 parent 69ea134 commit f9b07af
Showing 1 changed file with 42 additions and 33 deletions.
75 changes: 42 additions & 33 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::catalog::KAFKA_TIMESTAMP_COLUMN_NAME;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::types::{Datum, Scalar};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::{
SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo,
Expand Down Expand Up @@ -147,6 +147,33 @@ pub struct MessageMeta {
offset: String,
}

impl MessageMeta {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(&self, desc: &SourceColumnDesc) -> Option<Datum> {
match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
SourceColumnType::RowId => Datum::None.into(),
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
}).into()
}

// For other cases, return `None`.
SourceColumnType::Meta | SourceColumnType::Normal => None,
}
}
}

trait OpAction {
type Output;

Expand Down Expand Up @@ -257,44 +284,26 @@ impl SourceStreamChunkRowWriter<'_> {
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
) -> Result<()> {
let mut f = |desc: &SourceColumnDesc| {
if let Some(row_meta) = &self.row_meta {
let datum = match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
SourceColumnType::RowId => Datum::None,
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(row_meta.offset.as_str().into()),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &row_meta.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.into()
})
}

// For other cases, call the inner closure.
SourceColumnType::Meta | SourceColumnType::Normal => return f(desc),
};
Ok(A::output_for(datum))
let mut f_with_meta = |desc: &SourceColumnDesc| {
if let Some(meta_value) =
(self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc))
{
Ok(A::output_for(meta_value))
} else {
f(desc)
}
};

let mut modified = Vec::with_capacity(self.descs.len());
// Columns that changes have been applied to. Used to rollback when an error occurs.
let mut applied_columns = Vec::with_capacity(self.descs.len());

let result = self
.descs
.iter()
let result = (self.descs.iter())
.zip_eq_fast(self.builders.iter_mut())
.try_for_each(|(desc, builder)| -> Result<()> {
let output = f(desc)?;
A::apply(builder, output);
modified.push(builder);
Ok(())
.try_for_each(|(desc, builder)| {
f_with_meta(desc).map(|output| {
A::apply(builder, output);
applied_columns.push(builder);
})
});

match result {
Expand All @@ -304,7 +313,7 @@ impl SourceStreamChunkRowWriter<'_> {
}
Err(e) => {
tracing::warn!("failed to parse source data: {}", e);
for builder in modified {
for builder in applied_columns {
A::rollback(builder);
}
Err(e)
Expand Down

0 comments on commit f9b07af

Please sign in to comment.