Skip to content

Commit

Permalink
refactor(source): remove some useless indirection (#16970)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored May 28, 2024
1 parent 84b0ac2 commit 716264c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 48 deletions.
5 changes: 3 additions & 2 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::unified::Access;
use super::util::{bytes_from_url, get_kafka_topic};
use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::error::ConnectorResult;
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter,
Expand Down Expand Up @@ -132,7 +132,8 @@ impl JsonParser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer) {
match writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
{
Ok(_) => {}
Err(err) => errors.push(err),
}
Expand Down
16 changes: 5 additions & 11 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use risingwave_common::bail;

use super::unified::json::TimestamptzHandling;
use super::unified::ChangeEvent;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
Expand All @@ -24,8 +25,7 @@ use crate::parser::bytes_parser::BytesAccessBuilder;
use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
use crate::parser::unified::debezium::parse_transaction_meta;
use crate::parser::unified::upsert::UpsertChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer_with_op;
use crate::parser::unified::{AccessImpl, ChangeEventOperation};
use crate::parser::unified::AccessImpl;
use crate::parser::upsert_parser::get_key_column_name;
use crate::parser::{BytesProperties, ParseResult, ParserFormat};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
Expand Down Expand Up @@ -106,7 +106,6 @@ impl PlainParser {
// reuse upsert component but always insert
let mut row_op: UpsertChangeEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> =
UpsertChangeEvent::default();
let change_event_op = ChangeEventOperation::Upsert;

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
Expand All @@ -119,14 +118,9 @@ impl PlainParser {
row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?);
}

Ok(
apply_row_operation_on_stream_chunk_writer_with_op(
row_op,
&mut writer,
change_event_op,
)
.map(|_| ParseResult::Rows)?,
)
writer.insert(|column: &SourceColumnDesc| row_op.access_field(column))?;

Ok(ParseResult::Rows)
}
}

Expand Down
51 changes: 24 additions & 27 deletions src/connector/src/parser/unified/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,30 @@ impl<K, V> UpsertChangeEvent<K, V> {
}
}

impl<K, V> Access for UpsertChangeEvent<K, V>
impl<K, V> UpsertChangeEvent<K, V>
where
K: Access,
V: Access,
{
fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult {
let create_error = |name: String| AccessError::Undefined {
name,
path: String::new(),
};
match path.first() {
Some(&"key") => {
if let Some(ka) = &self.key_accessor {
ka.access(&path[1..], type_expected)
} else {
Err(create_error("key".to_string()))
}
}
Some(&"value") => {
if let Some(va) = &self.value_accessor {
va.access(&path[1..], type_expected)
} else {
Err(create_error("value".to_string()))
}
}
None => Ok(None),
Some(other) => Err(create_error(other.to_string())),
fn access_key(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult {
if let Some(ka) = &self.key_accessor {
ka.access(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "key".to_string(),
path: String::new(),
})
}
}

fn access_value(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult {
if let Some(va) = &self.value_accessor {
va.access(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "value".to_string(),
path: String::new(),
})
}
}
}
Expand All @@ -97,7 +94,7 @@ where
V: Access,
{
fn op(&self) -> std::result::Result<ChangeEventOperation, AccessError> {
if let Ok(Some(_)) = self.access(&["value"], None) {
if let Ok(Some(_)) = self.access_value(&[], None) {
Ok(ChangeEventOperation::Upsert)
} else {
Ok(ChangeEventOperation::Delete)
Expand All @@ -110,12 +107,12 @@ where
if let Some(key_as_column_name) = &self.key_column_name
&& &desc.name == key_as_column_name
{
self.access(&["key"], Some(&desc.data_type))
self.access_key(&[], Some(&desc.data_type))
} else {
self.access(&["key", &desc.name], Some(&desc.data_type))
self.access_key(&[&desc.name], Some(&desc.data_type))
}
}
None => self.access(&["value", &desc.name], Some(&desc.data_type)),
None => self.access_value(&[&desc.name], Some(&desc.data_type)),
_ => unreachable!(),
}
}
Expand Down
9 changes: 1 addition & 8 deletions src/connector/src/parser/unified/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{Access, AccessResult, ChangeEvent};
use super::{AccessResult, ChangeEvent};
use crate::parser::unified::ChangeEventOperation;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::SourceColumnDesc;
Expand All @@ -36,10 +36,3 @@ pub fn apply_row_operation_on_stream_chunk_writer(
let op = row_op.op()?;
apply_row_operation_on_stream_chunk_writer_with_op(row_op, writer, op)
}

pub fn apply_row_accessor_on_stream_chunk_writer(
accessor: impl Access,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> AccessResult<()> {
writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
}

0 comments on commit 716264c

Please sign in to comment.