Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): remove WriteGuard and fulfill_meta_column from parser #12542

Merged
merged 7 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connector/benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn parse(parser: JsonParser, column_desc: Vec<SourceColumnDesc>, input: Ve
SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len());
for payload in input_inner {
let row_writer = builder.row_writer();
parser.parse_inner(Some(payload), row_writer).await.unwrap();
parser.parse_inner(payload, row_writer).await.unwrap();
}
builder.finish();
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(associated_type_defaults)]
#![feature(impl_trait_in_assoc_type)]
#![feature(iter_from_generator)]
#![feature(if_let_guard)]

use std::time::Duration;

Expand Down
8 changes: 3 additions & 5 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use crate::parser::canal::operators::*;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::ChangeEventOperation;
use crate::parser::{
ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter, WriteGuard,
};
use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

const DATA: &str = "data";
Expand Down Expand Up @@ -55,7 +53,7 @@ impl CanalJsonParser {
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
Expand Down Expand Up @@ -128,7 +126,7 @@ impl ByteStreamSourceParser for CanalJsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/connector/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use simd_json::{BorrowedValue, ValueAccess};

pub(crate) fn json_object_smart_get_value<'a, 'b>(
/// Get a value from a json object by key, case insensitive.
///
/// Returns `None` if the given json value is not an object, or the key is not found.
pub(crate) fn json_object_get_case_insensitive<'a, 'b>(
v: &'b simd_json::BorrowedValue<'a>,
key: Cow<'b, str>,
key: &'b str,
) -> Option<&'b BorrowedValue<'a>> {
let obj = v.as_object()?;
let value = obj.get(key.as_ref());
let value = obj.get(key);
if value.is_some() {
return value;
return value; // fast path
}
for (k, v) in obj {
if k.eq_ignore_ascii_case(key.as_ref()) {
if k.eq_ignore_ascii_case(key) {
return Some(v);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::types::{Datum, Decimal, ScalarImpl, Timestamptz};

use super::{ByteStreamSourceParser, CsvProperties};
use crate::only_parse_payload;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

macro_rules! to_rust_type {
Expand Down Expand Up @@ -108,14 +108,14 @@ impl CsvParser {
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut fields = self.read_row(&payload)?;
if let Some(headers) = &mut self.headers {
if headers.is_empty() {
*headers = fields;
// Here we want a row, but got nothing. So it's an error for the `parse_inner` but
// has no bad impact on the system.
return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
}
writer.insert(|desc| {
if let Some(i) = headers.iter().position(|name| name == &desc.name) {
Expand Down Expand Up @@ -158,7 +158,7 @@ impl ByteStreamSourceParser for CsvParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Either;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};

Expand All @@ -23,8 +22,7 @@ use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, TransactionControl,
WriteGuard,
ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -93,7 +91,7 @@ impl DebeziumParser {
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<Either<WriteGuard, TransactionControl>> {
) -> Result<ParseResult> {
// tombetone messages are handled implicitly by these accessors
let key_accessor = match key {
None => None,
Expand All @@ -106,12 +104,12 @@ impl DebeziumParser {
let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);

match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
Ok(guard) => Ok(Either::Left(guard)),
Ok(_) => Ok(ParseResult::Rows),
Err(err) => {
// Only try to access transaction control message if the row operation access failed
// to make it a fast path.
if let Ok(transaction_control) = row_op.transaction_control() {
Ok(Either::Right(transaction_control))
Ok(ParseResult::TransactionControl(transaction_control))
} else {
Err(err)
}
Expand All @@ -135,7 +133,7 @@ impl ByteStreamSourceParser for DebeziumParser {
_key: Option<Vec<u8>>,
_payload: Option<Vec<u8>>,
_writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
unreachable!("should call `parse_one_with_txn` instead")
}

Expand All @@ -144,7 +142,7 @@ impl ByteStreamSourceParser for DebeziumParser {
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<Either<WriteGuard, TransactionControl>> {
) -> Result<ParseResult> {
self.parse_inner(key, payload, writer).await
}
}
6 changes: 3 additions & 3 deletions src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::only_parse_payload;
use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -82,7 +82,7 @@ impl DebeziumMongoJsonParser {
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

Expand Down Expand Up @@ -117,7 +117,7 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
35 changes: 14 additions & 21 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::schema_registry::Client;
use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local};
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::schema_registry::handle_sr_list;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard,
};
use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -106,17 +105,11 @@ impl JsonParser {
#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
mut payload: Option<Vec<u8>>,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
if payload.is_none() {
return Err(RwError::from(ErrorCode::InternalError(
"Empty payload with nonempty key for non-upsert".into(),
)));
}
let value =
simd_json::to_borrowed_value(&mut payload.as_mut().unwrap()[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
) -> Result<()> {
let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
let values = if let simd_json::BorrowedValue::Array(arr) = value {
arr
} else {
Expand Down Expand Up @@ -194,8 +187,8 @@ impl ByteStreamSourceParser for JsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}

Expand Down Expand Up @@ -257,7 +250,7 @@ mod tests {

for payload in get_payload() {
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand Down Expand Up @@ -362,7 +355,7 @@ mod tests {
{
let writer = builder.row_writer();
let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

// Parse an incorrect record.
Expand All @@ -371,14 +364,14 @@ mod tests {
// `v2` overflowed.
let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec();
// ignored the error, and fill None at v2.
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

// Parse a correct record.
{
let writer = builder.row_writer();
let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand Down Expand Up @@ -448,7 +441,7 @@ mod tests {
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
Expand Down Expand Up @@ -508,7 +501,7 @@ mod tests {
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::parser::unified::maxwell::MaxwellChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -57,7 +57,7 @@ impl MaxwellParser {
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
let row_op = MaxwellChangeEvent::new(payload_accessor);

Expand All @@ -79,7 +79,7 @@ impl ByteStreamSourceParser for MaxwellParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
// restrict the behaviours since there is no corresponding
// key/value test for maxwell yet.
only_parse_payload!(self, payload, writer)
Expand Down
Loading
Loading