diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b37b159d8aae9..7adab67bc619e 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -28,7 +28,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; use risingwave_common::catalog::KAFKA_TIMESTAMP_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::Datum; +use risingwave_common::types::{Datum, Scalar}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, @@ -147,6 +147,33 @@ pub struct MessageMeta { offset: String, } +impl MessageMeta { + /// Extract the value for the given column. + /// + /// Returns `None` if the column is not a meta column. + fn value_for_column(&self, desc: &SourceColumnDesc) -> Option { + match desc.column_type { + // Row id columns are filled with `NULL` here and will be filled with the real + // row id generated by `RowIdGenExecutor` later. + SourceColumnType::RowId => Datum::None.into(), + // Extract the offset from the meta data. + SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(), + // Extract custom meta data per connector. + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => { + assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); + kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }).into() + } + + // For other cases, return `None`. + SourceColumnType::Meta | SourceColumnType::Normal => None, + } + } +} + trait OpAction { type Output; @@ -257,44 +284,28 @@ impl SourceStreamChunkRowWriter<'_> { &mut self, mut f: impl FnMut(&SourceColumnDesc) -> Result, ) -> Result<()> { - let mut f = |desc: &SourceColumnDesc| { - if let Some(row_meta) = &self.row_meta { - let datum = match desc.column_type { - // Row id columns are filled with `NULL` here and will be filled with the real - // row id generated by `RowIdGenExecutor` later. - SourceColumnType::RowId => Datum::None, - // Extract the offset from the meta data. - SourceColumnType::Offset => Datum::Some(row_meta.offset.as_str().into()), - // Extract custom meta data per connector. - SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &row_meta.meta => { - assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); - kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - }) - } - - // For other cases, call the inner closure. - SourceColumnType::Meta | SourceColumnType::Normal => return f(desc), - }; - Ok(A::output_for(datum)) + let mut f_with_meta = |desc: &SourceColumnDesc| { + if let Some(meta_value) = + (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc)) + { + Ok(A::output_for(meta_value)) } else { f(desc) } }; - let mut modified = Vec::with_capacity(self.descs.len()); + // Columns that changes have been applied to. Used to rollback when an error occurs. + let mut applied_columns = Vec::with_capacity(self.descs.len()); let result = self .descs .iter() .zip_eq_fast(self.builders.iter_mut()) - .try_for_each(|(desc, builder)| -> Result<()> { - let output = f(desc)?; - A::apply(builder, output); - modified.push(builder); - Ok(()) + .try_for_each(|(desc, builder)| { + f_with_meta(desc).map(|output| { + A::apply(builder, output); + applied_columns.push(builder); + }) }); match result { @@ -304,7 +315,7 @@ impl SourceStreamChunkRowWriter<'_> { } Err(e) => { tracing::warn!("failed to parse source data: {}", e); - for builder in modified { + for builder in applied_columns { A::rollback(builder); } Err(e)