From 2a4cd951f02c4504ae483a9ad7c174e1a871fc68 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 17:40:46 +0800 Subject: [PATCH 1/7] move meta assignment into `f` Signed-off-by: Bugen Zhao --- src/connector/benches/parser.rs | 2 +- src/connector/src/lib.rs | 1 + src/connector/src/parser/common.rs | 15 +- src/connector/src/parser/csv_parser.rs | 2 +- src/connector/src/parser/json_parser.rs | 27 ++- src/connector/src/parser/mod.rs | 231 ++++++++++++++--------- src/connector/src/parser/unified/json.rs | 10 +- src/connector/src/parser/unified/util.rs | 2 +- src/source/benches/json_parser.rs | 4 +- src/utils/runtime/src/logger.rs | 6 +- 10 files changed, 172 insertions(+), 128 deletions(-) diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index 2c3b666b91d62..81bda6fccb395 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -91,7 +91,7 @@ async fn parse(parser: JsonParser, column_desc: Vec, input: Ve SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len()); for payload in input_inner { let row_writer = builder.row_writer(); - parser.parse_inner(Some(payload), row_writer).await.unwrap(); + parser.parse_inner(payload, row_writer).await.unwrap(); } builder.finish(); } diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 0d1a015f97663..1fba061555f44 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -31,6 +31,7 @@ #![feature(associated_type_defaults)] #![feature(impl_trait_in_assoc_type)] #![feature(iter_from_generator)] +#![feature(if_let_guard)] use std::time::Duration; diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index 168b13e1b2e22..2aed3c5f7927f 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -12,21 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; - use simd_json::{BorrowedValue, ValueAccess}; -pub(crate) fn json_object_smart_get_value<'a, 'b>( +/// Get a value from a json object by key, case insensitive. +/// +/// Returns `None` if the given json value is not an object, or the key is not found. +pub(crate) fn json_object_get_case_insentive<'a, 'b>( v: &'b simd_json::BorrowedValue<'a>, - key: Cow<'b, str>, + key: &'b str, ) -> Option<&'b BorrowedValue<'a>> { let obj = v.as_object()?; - let value = obj.get(key.as_ref()); + let value = obj.get(key); if value.is_some() { - return value; + return value; // fast path } for (k, v) in obj { - if k.eq_ignore_ascii_case(key.as_ref()) { + if k.eq_ignore_ascii_case(key) { return Some(v); } } diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index d77d35db9179e..b159539b031d8 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -115,7 +115,7 @@ impl CsvParser { *headers = fields; // Here we want a row, but got nothing. So it's an error for the `parse_inner` but // has no bad impact on the system. - return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); + return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); } writer.insert(|desc| { if let Some(i) = headers.iter().position(|name| name == &desc.name) { diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 7c59318e960a1..027a052fe73dd 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -25,6 +25,7 @@ use super::avro::schema_resolver::ConfluentSchemaResolver; use super::schema_registry::Client; use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local}; use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig}; +use crate::only_parse_payload; use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::schema_registry::handle_sr_list; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; @@ -106,17 +107,11 @@ impl JsonParser { #[allow(clippy::unused_async)] pub async fn parse_inner( &self, - mut payload: Option>, + mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - if payload.is_none() { - return Err(RwError::from(ErrorCode::InternalError( - "Empty payload with nonempty key for non-upsert".into(), - ))); - } - let value = - simd_json::to_borrowed_value(&mut payload.as_mut().unwrap()[self.payload_start_idx..]) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; let values = if let simd_json::BorrowedValue::Array(arr) = value { arr } else { @@ -195,7 +190,7 @@ impl ByteStreamSourceParser for JsonParser { payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } @@ -257,7 +252,7 @@ mod tests { for payload in get_payload() { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -362,7 +357,7 @@ mod tests { { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } // Parse an incorrect record. @@ -371,14 +366,14 @@ mod tests { // `v2` overflowed. let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(); // ignored the error, and fill None at v2. - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } // Parse a correct record. { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -448,7 +443,7 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); @@ -508,7 +503,7 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 987d049a11f0c..77bc0bd623eb2 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -22,7 +22,7 @@ use csv_parser::CsvParser; pub use debezium::*; use futures::{Future, TryFutureExt}; use futures_async_stream::try_stream; -use itertools::{Either, Itertools}; +use itertools::Either; pub use json_parser::*; pub use protobuf::*; use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; @@ -47,8 +47,9 @@ use self::util::get_kafka_topic; use crate::aws_auth::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::source::{ - BoxSourceStream, SourceColumnDesc, SourceContext, SourceContextRef, SourceEncode, SourceFormat, - SourceMeta, SourceStruct, SourceWithStateStream, SplitId, StreamChunkWithState, + BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef, + SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId, + StreamChunkWithState, }; mod avro; @@ -93,6 +94,7 @@ impl SourceStreamChunkBuilder { descs: &self.descs, builders: &mut self.builders, op_builder: &mut self.op_builder, + row_meta: None, } } @@ -125,12 +127,18 @@ impl SourceStreamChunkBuilder { } } +pub struct RowMeta { + meta: SourceMeta, + offset: String, +} + /// `SourceStreamChunkRowWriter` is responsible to write one row (Insert/Delete) or two rows /// (Update) to the [`StreamChunk`]. pub struct SourceStreamChunkRowWriter<'a> { descs: &'a [SourceColumnDesc], builders: &'a mut [ArrayBuilderImpl], op_builder: &'a mut Vec, + row_meta: Option, } /// `WriteGuard` can't be constructed directly in other mods due to a private field, so it can be @@ -142,7 +150,7 @@ pub struct WriteGuard(()); trait OpAction { type Output; - const DEFAULT_OUTPUT: Self::Output; + fn output_for(datum: Datum) -> Self::Output; fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output); @@ -156,7 +164,10 @@ struct OpActionInsert; impl OpAction for OpActionInsert { type Output = Datum; - const DEFAULT_OUTPUT: Self::Output = None; + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + datum + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: Datum) { @@ -179,7 +190,10 @@ struct OpActionDelete; impl OpAction for OpActionDelete { type Output = Datum; - const DEFAULT_OUTPUT: Self::Output = None; + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + datum + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: Datum) { @@ -202,7 +216,10 @@ struct OpActionUpdate; impl OpAction for OpActionUpdate { type Output = (Datum, Datum); - const DEFAULT_OUTPUT: Self::Output = (None, None); + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + (datum.clone(), datum) + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: (Datum, Datum)) { @@ -224,43 +241,60 @@ impl OpAction for OpActionUpdate { } impl SourceStreamChunkRowWriter<'_> { - #[expect( - clippy::disallowed_methods, - reason = "FIXME: why zip_eq_fast leads to compile error?" - )] fn do_action( &mut self, mut f: impl FnMut(&SourceColumnDesc) -> Result, ) -> Result { + let mut f = |desc: &SourceColumnDesc| { + if let Some(row_meta) = &self.row_meta { + let datum = match desc.column_type { + SourceColumnType::RowId => Datum::None, + SourceColumnType::Offset => Datum::Some(row_meta.offset.as_str().into()), + + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &row_meta.meta => { + assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); + kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .into() + }) + } + + SourceColumnType::Meta | SourceColumnType::Normal => return f(desc), + }; + + Ok(A::output_for(datum)) + } else { + f(desc) + } + }; + let mut modify_col = Vec::with_capacity(self.descs.len()); - self.descs + + let result = self + .descs .iter() - .zip_eq(self.builders.iter_mut()) - .enumerate() - .try_for_each(|(idx, (desc, builder))| -> Result<()> { - if desc.is_meta() || desc.is_offset() { - return Ok(()); - } - let output = if desc.is_row_id() { - A::DEFAULT_OUTPUT - } else { - f(desc)? - }; + .zip_eq_fast(self.builders.iter_mut()) + .try_for_each(|(desc, builder)| -> Result<()> { + let output = f(desc)?; A::apply(builder, output); - modify_col.push(idx); - + modify_col.push(builder); Ok(()) - }) - .inspect_err(|e| { - tracing::warn!("failed to parse source data: {}", e); - modify_col.iter().for_each(|idx| { - A::rollback(&mut self.builders[*idx]); - }); - })?; - - A::finish(self); + }); - Ok(WriteGuard(())) + match result { + Ok(_) => { + A::finish(self); + Ok(WriteGuard(())) + } + Err(e) => { + tracing::warn!("failed to parse source data: {}", e); + for builder in modify_col { + A::rollback(builder); + } + Err(e) + } + } } /// Write an `Insert` record to the [`StreamChunk`]. @@ -276,29 +310,6 @@ impl SourceStreamChunkRowWriter<'_> { self.do_action::(f) } - /// For other op like 'insert', 'update', 'delete', we will leave the hollow for the meta column - /// builder. e.g after insert - /// `data_builder = [1], meta_column_builder = [], op = [insert]` - /// - /// This function is used to fulfill this hollow in `meta_column_builder`. - /// e.g after fulfill - /// `data_builder = [1], meta_column_builder = [1], op = [insert]` - pub fn fulfill_meta_column( - &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> Option, - ) -> Result { - self.descs - .iter() - .zip_eq_fast(self.builders.iter_mut()) - .for_each(|(desc, builder)| { - if let Some(output) = f(desc) { - builder.append(output); - } - }); - - Ok(WriteGuard(())) - } - /// Write a `Delete` record to the [`StreamChunk`]. /// /// # Arguments @@ -327,6 +338,39 @@ impl SourceStreamChunkRowWriter<'_> { } } +impl SourceStreamChunkRowWriter<'_> { + /// For other op like 'insert', 'update', 'delete', we will leave the hollow for the meta column + /// builder. e.g after insert + /// `data_builder = [1], meta_column_builder = [], op = [insert]` + /// + /// This function is used to fulfill this hollow in `meta_column_builder`. + /// e.g after fulfill + /// `data_builder = [1], meta_column_builder = [1], op = [insert]` + #[deprecated] + pub fn fulfill_meta_column( + &mut self, + mut f: impl FnMut(&SourceColumnDesc) -> Option, + ) -> Result { + self.descs + .iter() + .zip_eq_fast(self.builders.iter_mut()) + .for_each(|(desc, builder)| { + if let Some(output) = f(desc) { + builder.append(output); + } + }); + + Ok(WriteGuard(())) + } + + fn with_meta(self, row_meta: RowMeta) -> Self { + Self { + row_meta: Some(row_meta), + ..self + } + } +} + /// Transaction control message. Currently only used by Debezium messages. pub enum TransactionControl { Begin { id: Box }, @@ -441,7 +485,14 @@ async fn into_chunk_stream(mut parser: P, data_stream let old_op_num = builder.op_num(); match parser - .parse_one_with_txn(msg.key, msg.payload, builder.row_writer()) + .parse_one_with_txn( + msg.key, + msg.payload, + builder.row_writer().with_meta(RowMeta { + meta: msg.meta, + offset: msg_offset, + }), + ) .await { Ok(Either::Left(WriteGuard(_))) => { @@ -453,37 +504,37 @@ async fn into_chunk_stream(mut parser: P, data_stream *len += new_op_num - old_op_num; } - // fill in meta column for specific source and offset column if needed - for _ in old_op_num..new_op_num { - let f = - |desc: &SourceColumnDesc| -> Option { - if desc.is_meta() && let SourceMeta::Kafka(kafka_meta) = &msg.meta { - match desc.name.as_str() { - KAFKA_TIMESTAMP_COLUMN_NAME => { - Some(kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - })) - } - _ => { - unreachable!( - "kafka will not have this meta column: {}", - desc.name - ) - } - } - } else if desc.is_offset() { - Some(Some(msg_offset.as_str().into())) - } else { - // None will be ignored by `fulfill_meta_column` - None - } - }; - - // fill in meta or offset column if any - builder.row_writer().fulfill_meta_column(f)?; - } + // // fill in meta column for specific source and offset column if needed + // for _ in old_op_num..new_op_num { + // let f = + // |desc: &SourceColumnDesc| -> Option { + // if desc.is_meta() && let SourceMeta::Kafka(kafka_meta) = &msg.meta { + // match desc.name.as_str() { + // KAFKA_TIMESTAMP_COLUMN_NAME => { + // Some(kafka_meta.timestamp.map(|ts| { + // risingwave_common::cast::i64_to_timestamptz(ts) + // .unwrap() + // .into() + // })) + // } + // _ => { + // unreachable!( + // "kafka will not have this meta column: {}", + // desc.name + // ) + // } + // } + // } else if desc.is_offset() { + // Some(Some(msg_offset.as_str().into())) + // } else { + // // None will be ignored by `fulfill_meta_column` + // None + // } + // }; + + // // // fill in meta or offset column if any + // // builder.row_writer().fulfill_meta_column(f)?; + // } } Ok(Either::Right(txn_ctl)) => { diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index c16d5f7668a4b..0f50b540690be 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -28,7 +28,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType}; use super::{Access, AccessError, AccessResult}; -use crate::parser::common::json_object_smart_get_value; +use crate::parser::common::json_object_get_case_insentive; use crate::parser::unified::avro::extract_decimal; #[derive(Clone, Debug)] @@ -441,7 +441,7 @@ impl JsonParseOptions { .zip_eq_fast(struct_type_info.types()) .map(|(field_name, field_type)| { self.parse( - json_object_smart_get_value(value, field_name.into()) + json_object_get_case_insentive(value, field_name) .unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)), Some(field_type), ) @@ -556,11 +556,11 @@ where { fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult { let mut value = &self.value; - for (idx, key) in path.iter().enumerate() { + for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { - json_object_smart_get_value(value, (*key).into()) + json_object_get_case_insentive(value, key) } else { - value.get(*key) + value.get(key) } { value = sub_value; } else { diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 2e63ee71a5a07..0f84e8d7c5d07 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -28,7 +28,7 @@ pub fn apply_delete_on_stream_chunk_writer( match res { Ok(datum) => Ok(datum), Err(e) => { - tracing::error!(name=?column.name, data_type=?&column.data_type, err=?e, "delete column error"); + tracing::error!(name=column.name, data_type=%column.data_type, err=%e, "delete column error"); if column.is_pk { // It should be an error when pk column is missing in the message Err(e)? diff --git a/src/source/benches/json_parser.rs b/src/source/benches/json_parser.rs index 70df93b902f57..e54a51befa9f1 100644 --- a/src/source/benches/json_parser.rs +++ b/src/source/benches/json_parser.rs @@ -85,11 +85,11 @@ fn generate_json_row(rng: &mut impl Rng) -> String { ) } -fn generate_json_rows() -> Vec>> { +fn generate_json_rows() -> Vec> { let mut rng = rand::thread_rng(); let mut records = Vec::with_capacity(NUM_RECORDS); for _ in 0..NUM_RECORDS { - records.push(Some(generate_json_row(&mut rng).into_bytes())); + records.push(generate_json_row(&mut rng).into_bytes()); } records } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 4a4b77936b800..58070c6d786e8 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -36,17 +36,13 @@ fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets targets // force a lower level for important logs .with_target("risingwave_stream", Level::DEBUG) - .with_target("risingwave_storage", Level::DEBUG) // force a higher level for noisy logs + .with_target("risingwave_storage", Level::INFO) .with_target("risingwave_sqlparser", Level::INFO) .with_target("pgwire", Level::INFO) .with_target(PGWIRE_QUERY_LOG, Level::OFF) // force a higher level for foyer logs .with_target("foyer", Level::WARN) - .with_target("foyer_common", Level::WARN) - .with_target("foyer_intrusive", Level::WARN) - .with_target("foyer_memory", Level::WARN) - .with_target("foyer_storage", Level::WARN) // disable events that are too verbose .with_target("events", Level::ERROR) } From 6f44c8bb89a52ed69c1fe200d4423e5ac8ae101b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 17:52:28 +0800 Subject: [PATCH 2/7] refine interfaces Signed-off-by: Bugen Zhao --- src/connector/src/parser/mod.rs | 92 +++++++-------------------------- 1 file changed, 19 insertions(+), 73 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 77bc0bd623eb2..af788f1ca6f85 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -240,6 +240,15 @@ impl OpAction for OpActionUpdate { } } +impl SourceStreamChunkRowWriter<'_> { + fn with_meta(self, row_meta: RowMeta) -> Self { + Self { + row_meta: Some(row_meta), + ..self + } + } +} + impl SourceStreamChunkRowWriter<'_> { fn do_action( &mut self, @@ -248,9 +257,12 @@ impl SourceStreamChunkRowWriter<'_> { let mut f = |desc: &SourceColumnDesc| { if let Some(row_meta) = &self.row_meta { let datum = match desc.column_type { + // Row id columns are filled with `NULL` here and will be filled with the real + // row id generated by `RowIdGenExecutor` later. SourceColumnType::RowId => Datum::None, + // Extract the offset from the meta data. SourceColumnType::Offset => Datum::Some(row_meta.offset.as_str().into()), - + // Extract custom meta data per connector. SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &row_meta.meta => { assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); kafka_meta.timestamp.map(|ts| { @@ -260,16 +272,16 @@ impl SourceStreamChunkRowWriter<'_> { }) } + // For other cases, call the inner closure. SourceColumnType::Meta | SourceColumnType::Normal => return f(desc), }; - Ok(A::output_for(datum)) } else { f(desc) } }; - let mut modify_col = Vec::with_capacity(self.descs.len()); + let mut modified = Vec::with_capacity(self.descs.len()); let result = self .descs @@ -278,7 +290,7 @@ impl SourceStreamChunkRowWriter<'_> { .try_for_each(|(desc, builder)| -> Result<()> { let output = f(desc)?; A::apply(builder, output); - modify_col.push(builder); + modified.push(builder); Ok(()) }); @@ -289,7 +301,7 @@ impl SourceStreamChunkRowWriter<'_> { } Err(e) => { tracing::warn!("failed to parse source data: {}", e); - for builder in modify_col { + for builder in modified { A::rollback(builder); } Err(e) @@ -338,39 +350,6 @@ impl SourceStreamChunkRowWriter<'_> { } } -impl SourceStreamChunkRowWriter<'_> { - /// For other op like 'insert', 'update', 'delete', we will leave the hollow for the meta column - /// builder. e.g after insert - /// `data_builder = [1], meta_column_builder = [], op = [insert]` - /// - /// This function is used to fulfill this hollow in `meta_column_builder`. - /// e.g after fulfill - /// `data_builder = [1], meta_column_builder = [1], op = [insert]` - #[deprecated] - pub fn fulfill_meta_column( - &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> Option, - ) -> Result { - self.descs - .iter() - .zip_eq_fast(self.builders.iter_mut()) - .for_each(|(desc, builder)| { - if let Some(output) = f(desc) { - builder.append(output); - } - }); - - Ok(WriteGuard(())) - } - - fn with_meta(self, row_meta: RowMeta) -> Self { - Self { - row_meta: Some(row_meta), - ..self - } - } -} - /// Transaction control message. Currently only used by Debezium messages. pub enum TransactionControl { Begin { id: Box }, @@ -480,8 +459,7 @@ async fn into_chunk_stream(mut parser: P, data_stream continue; } - let msg_offset = msg.offset; - split_offset_mapping.insert(msg.split_id, msg_offset.clone()); + split_offset_mapping.insert(msg.split_id, msg.offset.clone()); let old_op_num = builder.op_num(); match parser @@ -490,7 +468,7 @@ async fn into_chunk_stream(mut parser: P, data_stream msg.payload, builder.row_writer().with_meta(RowMeta { meta: msg.meta, - offset: msg_offset, + offset: msg.offset, }), ) .await @@ -503,38 +481,6 @@ async fn into_chunk_stream(mut parser: P, data_stream if let Some(Transaction { len, .. }) = &mut current_transaction { *len += new_op_num - old_op_num; } - - // // fill in meta column for specific source and offset column if needed - // for _ in old_op_num..new_op_num { - // let f = - // |desc: &SourceColumnDesc| -> Option { - // if desc.is_meta() && let SourceMeta::Kafka(kafka_meta) = &msg.meta { - // match desc.name.as_str() { - // KAFKA_TIMESTAMP_COLUMN_NAME => { - // Some(kafka_meta.timestamp.map(|ts| { - // risingwave_common::cast::i64_to_timestamptz(ts) - // .unwrap() - // .into() - // })) - // } - // _ => { - // unreachable!( - // "kafka will not have this meta column: {}", - // desc.name - // ) - // } - // } - // } else if desc.is_offset() { - // Some(Some(msg_offset.as_str().into())) - // } else { - // // None will be ignored by `fulfill_meta_column` - // None - // } - // }; - - // // // fill in meta or offset column if any - // // builder.row_writer().fulfill_meta_column(f)?; - // } } Ok(Either::Right(txn_ctl)) => { From b507a9109d6f658bfa0bfd3e76fdcb890f8da24f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 18:03:02 +0800 Subject: [PATCH 3/7] remove write guard Signed-off-by: Bugen Zhao --- .../src/parser/canal/simd_json_parser.rs | 8 ++-- src/connector/src/parser/csv_parser.rs | 6 +-- .../src/parser/debezium/debezium_parser.rs | 14 +++--- .../src/parser/debezium/mongo_json_parser.rs | 6 +-- src/connector/src/parser/json_parser.rs | 8 ++-- .../src/parser/maxwell/maxwell_parser.rs | 6 +-- src/connector/src/parser/mod.rs | 48 ++++++++----------- src/connector/src/parser/plain_parser.rs | 6 +-- src/connector/src/parser/unified/util.rs | 12 ++--- src/connector/src/parser/upsert_parser.rs | 6 +-- src/connector/src/parser/util.rs | 3 +- 11 files changed, 53 insertions(+), 70 deletions(-) diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 8d2d03bf1611c..fb70411254196 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -21,9 +21,7 @@ use crate::parser::canal::operators::*; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::unified::ChangeEventOperation; -use crate::parser::{ - ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter, WriteGuard, -}; +use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; const DATA: &str = "data"; @@ -55,7 +53,7 @@ impl CanalJsonParser { &self, mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; @@ -128,7 +126,7 @@ impl ByteStreamSourceParser for CanalJsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index b159539b031d8..648341130665c 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -22,7 +22,7 @@ use risingwave_common::types::{Datum, Decimal, ScalarImpl, Timestamptz}; use super::{ByteStreamSourceParser, CsvProperties}; use crate::only_parse_payload; -use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::SourceStreamChunkRowWriter; use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef}; macro_rules! to_rust_type { @@ -108,7 +108,7 @@ impl CsvParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut fields = self.read_row(&payload)?; if let Some(headers) = &mut self.headers { if headers.is_empty() { @@ -158,7 +158,7 @@ impl ByteStreamSourceParser for CsvParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 8fd0f6f2ba4f6..e98a57650d308 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Either; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; @@ -23,8 +22,7 @@ use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties, - ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, TransactionControl, - WriteGuard, + ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -93,7 +91,7 @@ impl DebeziumParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result> { + ) -> Result { // tombetone messages are handled implicitly by these accessors let key_accessor = match key { None => None, @@ -106,12 +104,12 @@ impl DebeziumParser { let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor); match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) { - Ok(guard) => Ok(Either::Left(guard)), + Ok(_) => Ok(ParseResult::Rows), Err(err) => { // Only try to access transaction control message if the row operation access failed // to make it a fast path. if let Ok(transaction_control) = row_op.transaction_control() { - Ok(Either::Right(transaction_control)) + Ok(ParseResult::TransactionControl(transaction_control)) } else { Err(err) } @@ -135,7 +133,7 @@ impl ByteStreamSourceParser for DebeziumParser { _key: Option>, _payload: Option>, _writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { unreachable!("should call `parse_one_with_txn` instead") } @@ -144,7 +142,7 @@ impl ByteStreamSourceParser for DebeziumParser { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result> { + ) -> Result { self.parse_inner(key, payload, writer).await } } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 910b5a3e0131d..4b6478f3d73c1 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -23,7 +23,7 @@ use crate::only_parse_payload; use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton}; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -82,7 +82,7 @@ impl DebeziumMongoJsonParser { &self, mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; @@ -117,7 +117,7 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 027a052fe73dd..6568f2eee40a3 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -31,9 +31,7 @@ use crate::parser::schema_registry::handle_sr_list; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer; use crate::parser::unified::AccessImpl; -use crate::parser::{ - AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard, -}; +use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -109,7 +107,7 @@ impl JsonParser { &self, mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; let values = if let simd_json::BorrowedValue::Array(arr) = value { @@ -189,7 +187,7 @@ impl ByteStreamSourceParser for JsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs index fff28092fa451..0980472021bff 100644 --- a/src/connector/src/parser/maxwell/maxwell_parser.rs +++ b/src/connector/src/parser/maxwell/maxwell_parser.rs @@ -19,7 +19,7 @@ use crate::parser::unified::maxwell::MaxwellChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -57,7 +57,7 @@ impl MaxwellParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let payload_accessor = self.payload_builder.generate_accessor(payload).await?; let row_op = MaxwellChangeEvent::new(payload_accessor); @@ -79,7 +79,7 @@ impl ByteStreamSourceParser for MaxwellParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { // restrict the behaviours since there is no corresponding // key/value test for maxwell yet. only_parse_payload!(self, payload, writer) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index af788f1ca6f85..d0a877f4fe9b3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -22,7 +22,6 @@ use csv_parser::CsvParser; pub use debezium::*; use futures::{Future, TryFutureExt}; use futures_async_stream::try_stream; -use itertools::Either; pub use json_parser::*; pub use protobuf::*; use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; @@ -141,12 +140,6 @@ pub struct SourceStreamChunkRowWriter<'a> { row_meta: Option, } -/// `WriteGuard` can't be constructed directly in other mods due to a private field, so it can be -/// used to ensure that all methods on [`SourceStreamChunkRowWriter`] are called at least once in -/// the `SourceParser::parse` implementation. -#[derive(Debug)] -pub struct WriteGuard(()); - trait OpAction { type Output; @@ -253,7 +246,7 @@ impl SourceStreamChunkRowWriter<'_> { fn do_action( &mut self, mut f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { + ) -> Result<()> { let mut f = |desc: &SourceColumnDesc| { if let Some(row_meta) = &self.row_meta { let datum = match desc.column_type { @@ -297,7 +290,7 @@ impl SourceStreamChunkRowWriter<'_> { match result { Ok(_) => { A::finish(self); - Ok(WriteGuard(())) + Ok(()) } Err(e) => { tracing::warn!("failed to parse source data: {}", e); @@ -313,12 +306,8 @@ impl SourceStreamChunkRowWriter<'_> { /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - pub fn insert( - &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { + pub fn insert(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } @@ -326,12 +315,8 @@ impl SourceStreamChunkRowWriter<'_> { /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - pub fn delete( - &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { + pub fn delete(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } @@ -339,13 +324,12 @@ impl SourceStreamChunkRowWriter<'_> { /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding /// [`SourceColumnDesc`]. pub fn update( &mut self, f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>, - ) -> Result { + ) -> Result<()> { self.do_action::(f) } } @@ -356,6 +340,11 @@ pub enum TransactionControl { Commit { id: Box }, } +pub enum ParseResult { + Rows, + TransactionControl(TransactionControl), +} + /// `ByteStreamSourceParser` is a new message parser, the parser should consume /// the input data stream and return a stream of parsed msgs. pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { @@ -371,7 +360,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> impl Future> + Send + 'a; + ) -> impl Future> + Send + 'a; /// Parse one record from the given `payload`, either write it to the `writer` or interpret it /// as a transaction control message. @@ -383,8 +372,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> impl Future>> + Send + 'a { - self.parse_one(key, payload, writer).map_ok(Either::Left) + ) -> impl Future> + Send + 'a { + self.parse_one(key, payload, writer) + .map_ok(|_| ParseResult::Rows) } /// Parse a data stream of one source split into a stream of [`StreamChunk`]. @@ -473,17 +463,17 @@ async fn into_chunk_stream(mut parser: P, data_stream ) .await { - Ok(Either::Left(WriteGuard(_))) => { - // new_op_num - old_op_num is the number of rows added to the builder - let new_op_num = builder.op_num(); + Ok(ParseResult::Rows) => { + // The number of rows added to the builder. + let num = builder.op_num() - old_op_num; // Aggregate the number of rows in the current transaction. if let Some(Transaction { len, .. }) = &mut current_transaction { - *len += new_op_num - old_op_num; + *len += num; } } - Ok(Either::Right(txn_ctl)) => { + Ok(ParseResult::TransactionControl(txn_ctl)) => { match txn_ctl { TransactionControl::Begin { id } => { if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index b50ab0e322ba3..53f87660de031 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use super::unified::util::apply_row_accessor_on_stream_chunk_writer; use super::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::only_parse_payload; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -59,7 +59,7 @@ impl PlainParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let accessor = self.payload_builder.generate_accessor(payload).await?; apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer) @@ -80,7 +80,7 @@ impl ByteStreamSourceParser for PlainParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 0f84e8d7c5d07..38b83a3fc00d2 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -17,12 +17,12 @@ use risingwave_common::types::Datum; use super::{Access, AccessError, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; -use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::SourceStreamChunkRowWriter; pub fn apply_delete_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { writer.delete(|column| { let res = row_op.access_field(&column.name, &column.data_type); match res { @@ -43,7 +43,7 @@ pub fn apply_delete_on_stream_chunk_writer( pub fn apply_upsert_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { writer.insert(|column| { let res = match row_op.access_field(&column.name, &column.data_type) { Ok(o) => Ok(o), @@ -69,7 +69,7 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, op: ChangeEventOperation, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { match op { ChangeEventOperation::Upsert => apply_upsert_on_stream_chunk_writer(row_op, writer), ChangeEventOperation::Delete => apply_delete_on_stream_chunk_writer(row_op, writer), @@ -79,7 +79,7 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op( pub fn apply_row_operation_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { let op = row_op.op()?; apply_row_operation_on_stream_chunk_writer_with_op(row_op, writer, op) } @@ -87,7 +87,7 @@ pub fn apply_row_operation_on_stream_chunk_writer( pub fn apply_row_accessor_on_stream_chunk_writer( accessor: impl Access, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> Result { +) -> Result<(), RwError> { writer.insert(|column| { let res: Result = match accessor .access(&[&column.name], Some(&column.data_type)) diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index a9fce2f991475..381e429565d11 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -22,7 +22,7 @@ use super::unified::util::apply_row_operation_on_stream_chunk_writer_with_op; use super::unified::{AccessImpl, ChangeEventOperation}; use super::{ AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::extract_key_config; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -100,7 +100,7 @@ impl UpsertParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut row_op: UpsertChangeEvent, AccessImpl<'_, '_>> = UpsertChangeEvent::default(); let mut change_event_op = ChangeEventOperation::Delete; @@ -134,7 +134,7 @@ impl ByteStreamSourceParser for UpsertParser { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { self.parse_inner(key, payload, writer).await } } diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 7444fe202de46..b4b36ed4b7cc1 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -24,7 +24,6 @@ use risingwave_common::error::{Result, RwError}; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::{default_conn_config, s3_client}; -use crate::parser::WriteGuard; const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region"; @@ -71,7 +70,7 @@ pub(super) async fn download_from_http(location: &Url) -> Result { // if all ok, return ok // if part of them are errors, log err and return ok #[inline] -pub(super) fn at_least_one_ok(mut results: Vec>) -> Result { +pub(super) fn at_least_one_ok(mut results: Vec>) -> Result<()> { let errors = results .iter() .filter_map(|r| r.as_ref().err()) From 95d062721ea4de5d10abbcbba0aa697bb80d285b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 18:14:02 +0800 Subject: [PATCH 4/7] refine docs Signed-off-by: Bugen Zhao --- src/connector/src/parser/mod.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index d0a877f4fe9b3..a7cdac79160c7 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -126,18 +126,25 @@ impl SourceStreamChunkBuilder { } } -pub struct RowMeta { - meta: SourceMeta, - offset: String, -} - -/// `SourceStreamChunkRowWriter` is responsible to write one row (Insert/Delete) or two rows -/// (Update) to the [`StreamChunk`]. +/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], +/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically. pub struct SourceStreamChunkRowWriter<'a> { descs: &'a [SourceColumnDesc], builders: &'a mut [ArrayBuilderImpl], op_builder: &'a mut Vec, - row_meta: Option, + + /// An optional meta data of the original message. + /// + /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`]. + row_meta: Option, +} + +/// The meta data of the original message for a row writer. +/// +/// Extracted from the `SourceMessage`. +pub struct MessageMeta { + meta: SourceMeta, + offset: String, } trait OpAction { @@ -234,7 +241,10 @@ impl OpAction for OpActionUpdate { } impl SourceStreamChunkRowWriter<'_> { - fn with_meta(self, row_meta: RowMeta) -> Self { + /// Set the meta data of the original message for this row writer. + /// + /// This should always be called except for tests. + fn with_meta(self, row_meta: MessageMeta) -> Self { Self { row_meta: Some(row_meta), ..self @@ -340,8 +350,11 @@ pub enum TransactionControl { Commit { id: Box }, } +/// The result of parsing a message. pub enum ParseResult { + /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`]. Rows, + /// A transaction control message is parsed. TransactionControl(TransactionControl), } @@ -456,7 +469,7 @@ async fn into_chunk_stream(mut parser: P, data_stream .parse_one_with_txn( msg.key, msg.payload, - builder.row_writer().with_meta(RowMeta { + builder.row_writer().with_meta(MessageMeta { meta: msg.meta, offset: msg.offset, }), From 6810672ae39cf8d3a7f8d07d4b5291baf18dff8b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 18:32:17 +0800 Subject: [PATCH 5/7] fix typo Signed-off-by: Bugen Zhao --- src/connector/src/parser/common.rs | 2 +- src/connector/src/parser/mod.rs | 4 +++- src/connector/src/parser/unified/json.rs | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index 2aed3c5f7927f..5a288dfd80b8d 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -17,7 +17,7 @@ use simd_json::{BorrowedValue, ValueAccess}; /// Get a value from a json object by key, case insensitive. /// /// Returns `None` if the given json value is not an object, or the key is not found. -pub(crate) fn json_object_get_case_insentive<'a, 'b>( +pub(crate) fn json_object_get_case_insensitive<'a, 'b>( v: &'b simd_json::BorrowedValue<'a>, key: &'b str, ) -> Option<&'b BorrowedValue<'a>> { diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index a7cdac79160c7..b37b159d8aae9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -317,6 +317,7 @@ impl SourceStreamChunkRowWriter<'_> { /// # Arguments /// /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. + /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. pub fn insert(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } @@ -326,6 +327,7 @@ impl SourceStreamChunkRowWriter<'_> { /// # Arguments /// /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. + /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. pub fn delete(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } @@ -335,7 +337,7 @@ impl SourceStreamChunkRowWriter<'_> { /// # Arguments /// /// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding - /// [`SourceColumnDesc`]. + /// [`SourceColumnDesc`]. Callers only need to handle columns with the type [`SourceColumnType::Normal`]. pub fn update( &mut self, f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 0f50b540690be..35cac86c34e55 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -28,7 +28,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType}; use super::{Access, AccessError, AccessResult}; -use crate::parser::common::json_object_get_case_insentive; +use crate::parser::common::json_object_get_case_insensitive; use crate::parser::unified::avro::extract_decimal; #[derive(Clone, Debug)] @@ -441,7 +441,7 @@ impl JsonParseOptions { .zip_eq_fast(struct_type_info.types()) .map(|(field_name, field_type)| { self.parse( - json_object_get_case_insentive(value, field_name) + json_object_get_case_insensitive(value, field_name) .unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)), Some(field_type), ) @@ -558,7 +558,7 @@ where let mut value = &self.value; for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { - json_object_get_case_insentive(value, key) + json_object_get_case_insensitive(value, key) } else { value.get(key) } { From 69ea134b9468fb8bb78d7f375bdccfee656a91b5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Sep 2023 18:33:14 +0800 Subject: [PATCH 6/7] revert changes on logger Signed-off-by: Bugen Zhao --- src/utils/runtime/src/logger.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 58070c6d786e8..4a4b77936b800 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -36,13 +36,17 @@ fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets targets // force a lower level for important logs .with_target("risingwave_stream", Level::DEBUG) + .with_target("risingwave_storage", Level::DEBUG) // force a higher level for noisy logs - .with_target("risingwave_storage", Level::INFO) .with_target("risingwave_sqlparser", Level::INFO) .with_target("pgwire", Level::INFO) .with_target(PGWIRE_QUERY_LOG, Level::OFF) // force a higher level for foyer logs .with_target("foyer", Level::WARN) + .with_target("foyer_common", Level::WARN) + .with_target("foyer_intrusive", Level::WARN) + .with_target("foyer_memory", Level::WARN) + .with_target("foyer_storage", Level::WARN) // disable events that are too verbose .with_target("events", Level::ERROR) } From f9b07af6e44b02807713de07a11e8371a7567ba7 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 27 Sep 2023 12:15:47 +0800 Subject: [PATCH 7/7] refactor meta value method and closure name Signed-off-by: Bugen Zhao --- src/connector/src/parser/mod.rs | 75 ++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b37b159d8aae9..8c7914fe16354 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -28,7 +28,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; use risingwave_common::catalog::KAFKA_TIMESTAMP_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::Datum; +use risingwave_common::types::{Datum, Scalar}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, @@ -147,6 +147,33 @@ pub struct MessageMeta { offset: String, } +impl MessageMeta { + /// Extract the value for the given column. + /// + /// Returns `None` if the column is not a meta column. + fn value_for_column(&self, desc: &SourceColumnDesc) -> Option { + match desc.column_type { + // Row id columns are filled with `NULL` here and will be filled with the real + // row id generated by `RowIdGenExecutor` later. + SourceColumnType::RowId => Datum::None.into(), + // Extract the offset from the meta data. + SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(), + // Extract custom meta data per connector. + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => { + assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); + kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }).into() + } + + // For other cases, return `None`. + SourceColumnType::Meta | SourceColumnType::Normal => None, + } + } +} + trait OpAction { type Output; @@ -257,44 +284,26 @@ impl SourceStreamChunkRowWriter<'_> { &mut self, mut f: impl FnMut(&SourceColumnDesc) -> Result, ) -> Result<()> { - let mut f = |desc: &SourceColumnDesc| { - if let Some(row_meta) = &self.row_meta { - let datum = match desc.column_type { - // Row id columns are filled with `NULL` here and will be filled with the real - // row id generated by `RowIdGenExecutor` later. - SourceColumnType::RowId => Datum::None, - // Extract the offset from the meta data. - SourceColumnType::Offset => Datum::Some(row_meta.offset.as_str().into()), - // Extract custom meta data per connector. - SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &row_meta.meta => { - assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); - kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - }) - } - - // For other cases, call the inner closure. - SourceColumnType::Meta | SourceColumnType::Normal => return f(desc), - }; - Ok(A::output_for(datum)) + let mut f_with_meta = |desc: &SourceColumnDesc| { + if let Some(meta_value) = + (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc)) + { + Ok(A::output_for(meta_value)) } else { f(desc) } }; - let mut modified = Vec::with_capacity(self.descs.len()); + // Columns that changes have been applied to. Used to rollback when an error occurs. + let mut applied_columns = Vec::with_capacity(self.descs.len()); - let result = self - .descs - .iter() + let result = (self.descs.iter()) .zip_eq_fast(self.builders.iter_mut()) - .try_for_each(|(desc, builder)| -> Result<()> { - let output = f(desc)?; - A::apply(builder, output); - modified.push(builder); - Ok(()) + .try_for_each(|(desc, builder)| { + f_with_meta(desc).map(|output| { + A::apply(builder, output); + applied_columns.push(builder); + }) }); match result { @@ -304,7 +313,7 @@ impl SourceStreamChunkRowWriter<'_> { } Err(e) => { tracing::warn!("failed to parse source data: {}", e); - for builder in modified { + for builder in applied_columns { A::rollback(builder); } Err(e)