Skip to content

Commit

Permalink
byebye upsert event
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed May 31, 2024
1 parent 8567878 commit 992d083
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 114 deletions.
3 changes: 1 addition & 2 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl CsvParser {
// The header row does not output a row, so we return early.
return Ok(());
}
writer.insert(|desc| {
writer.do_insert(|desc| {
if let Some(i) = headers.iter().position(|name| name == &desc.name) {
let value = fields.get_mut(i).map(std::mem::take).unwrap_or_default();
if value.is_empty() {
Expand All @@ -125,7 +125,7 @@ impl CsvParser {
})?;
} else {
fields.reverse();
writer.insert(|desc| {
writer.do_insert(|desc| {
if let Some(value) = fields.pop() {
if value.is_empty() {
return Ok(None);
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct JsonAccessBuilder {
impl AccessBuilder for JsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
// XXX: When will we enter this branch?
if payload.is_empty() {
self.value = Some("{}".into());
} else {
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn insert(
pub fn do_insert(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
Expand All @@ -501,7 +501,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn delete(
pub fn do_delete(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
Expand All @@ -512,7 +512,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn update(
pub fn do_update(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<(Datum, Datum)>,
) -> AccessResult<()> {
Expand Down Expand Up @@ -590,7 +590,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.insert(|_column| Ok(None));
_ = writer.do_insert(|_column| Ok(None));
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use risingwave_common::bail;

use super::unified::json::TimestamptzHandling;
use super::unified::upsert::PlainEvent;
use super::unified::kv_event::KvEvent;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl PlainParser {
};
}

let mut row_op: PlainEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = PlainEvent::default();
let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
Expand All @@ -115,7 +115,7 @@ impl PlainParser {
row_op.with_value(self.payload_builder.generate_accessor(data).await?);
}

writer.insert(|column: &SourceColumnDesc| row_op.access_field_impl(column))?;
writer.do_insert(|column: &SourceColumnDesc| row_op.access_field(column))?;

Ok(ParseResult::Rows)
}
Expand Down
11 changes: 1 addition & 10 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::types::{
};
use risingwave_common::util::iter_util::ZipEqFast;

use super::{bail_uncategorized, uncategorized, Access, AccessError, AccessResult, NullableAccess};
use super::{bail_uncategorized, uncategorized, Access, AccessError, AccessResult};
use crate::error::ConnectorResult;
use crate::parser::avro::util::avro_to_jsonb;
#[derive(Clone)]
Expand Down Expand Up @@ -307,15 +307,6 @@ where
}
}

impl<'a, 'b> NullableAccess for AvroAccess<'a, 'b>
where
'a: 'b,
{
fn is_null(&self) -> bool {
matches!(self.value, Value::Null)
}
}

pub(crate) fn avro_decimal_to_rust_decimal(
avro_decimal: AvroDecimal,
_precision: usize,
Expand Down
11 changes: 1 addition & 10 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use simd_json::prelude::{
use simd_json::{BorrowedValue, ValueType};
use thiserror_ext::AsReport;

use super::{Access, AccessError, AccessResult, NullableAccess};
use super::{Access, AccessError, AccessResult};
use crate::parser::common::json_object_get_case_insensitive;
use crate::parser::unified::avro::extract_decimal;
use crate::schema::{bail_invalid_option_error, InvalidOptionError};
Expand Down Expand Up @@ -647,12 +647,3 @@ where
self.options.parse(value, type_expected)
}
}

impl<'a, 'b> NullableAccess for JsonAccess<'a, 'b>
where
'a: 'b,
{
fn is_null(&self) -> bool {
matches!(self.value.value_type(), ValueType::Null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Deref, DerefMut};

use risingwave_common::types::DataType;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;

use super::{Access, ChangeEvent, ChangeEventOperation, NullableAccess};
use super::Access;
use crate::parser::unified::AccessError;
use crate::source::SourceColumnDesc;

Expand Down Expand Up @@ -78,61 +76,11 @@ where
}
}

pub fn access_field_impl(&self, desc: &SourceColumnDesc) -> super::AccessResult {
pub fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult {
match desc.additional_column.column_type {
Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type),
None => self.access_value(&[&desc.name], &desc.data_type),
_ => unreachable!(),
}
}
}

/// Wraps a key-value message into an upsert event, which uses `null` value to represent `DELETE`s.
pub struct UpsertChangeEvent<K, V>(KvEvent<K, V>);

impl<K, V> Default for UpsertChangeEvent<K, V> {
fn default() -> Self {
Self(KvEvent::default())
}
}

impl<K, V> Deref for UpsertChangeEvent<K, V> {
type Target = KvEvent<K, V>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<K, V> DerefMut for UpsertChangeEvent<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<K, V> ChangeEvent for UpsertChangeEvent<K, V>
where
K: Access,
V: NullableAccess,
{
fn op(&self) -> std::result::Result<ChangeEventOperation, AccessError> {
if let Some(va) = &self.0.value_accessor {
if va.is_null() {
Ok(ChangeEventOperation::Delete)
} else {
Ok(ChangeEventOperation::Upsert)
}
} else {
Err(AccessError::Undefined {
name: "value".to_string(),
path: String::new(),
})
}
}

fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult {
self.0.access_field_impl(desc)
}
}

pub type PlainEvent<K, V> = KvEvent<K, V>;
20 changes: 1 addition & 19 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pub mod avro;
pub mod bytes;
pub mod debezium;
pub mod json;
pub mod kv_event;
pub mod maxwell;
pub mod protobuf;
pub mod upsert;
pub mod util;

pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
Expand All @@ -42,11 +42,6 @@ pub trait Access {
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult;
}

/// Whether the whole message is null. This is used for UPSERT events: `null` means DELETE.
pub trait NullableAccess: Access {
fn is_null(&self) -> bool;
}

pub enum AccessImpl<'a, 'b> {
Avro(AvroAccess<'a, 'b>),
Bytes(BytesAccess<'a>),
Expand All @@ -67,19 +62,6 @@ impl Access for AccessImpl<'_, '_> {
}
}

impl NullableAccess for AccessImpl<'_, '_> {
fn is_null(&self) -> bool {
match self {
Self::Avro(accessor) => accessor.is_null(),
Self::Json(accessor) => accessor.is_null(),
Self::Bytes(_) | Self::Protobuf(_) | Self::MongoJson(_) => {
// TODO: refactor upsert related code to avoid this branch
unreachable!()
}
}
}
}

#[derive(Debug, Clone, Copy)]
pub enum ChangeEventOperation {
Upsert, // Insert or Update
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/unified/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op(
) -> AccessResult<()> {
let f = |column: &SourceColumnDesc| row_op.access_field(column);
match op {
ChangeEventOperation::Upsert => writer.insert(f),
ChangeEventOperation::Delete => writer.delete(f),
ChangeEventOperation::Upsert => writer.do_insert(f),
ChangeEventOperation::Delete => writer.do_delete(f),
}
}

Expand Down
19 changes: 11 additions & 8 deletions src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use risingwave_common::bail;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;

use super::bytes_parser::BytesAccessBuilder;
use super::unified::upsert::UpsertChangeEvent;
use super::unified::util::apply_row_operation_on_stream_chunk_writer_with_op;
use super::unified::{AccessImpl, ChangeEventOperation};
use super::{
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::error::ConnectorResult;
use crate::parser::unified::kv_event::KvEvent;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -97,22 +96,26 @@ impl UpsertParser {
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let mut row_op: UpsertChangeEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> =
UpsertChangeEvent::default();
let mut change_event_op = ChangeEventOperation::Delete;
let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();
if let Some(data) = key {
row_op.with_key(self.key_builder.generate_accessor(data).await?);
}
// Empty payload of kafka is Some(vec![])
let change_event_op;
if let Some(data) = payload
&& !data.is_empty()
{
row_op.with_value(self.payload_builder.generate_accessor(data).await?);
change_event_op = ChangeEventOperation::Upsert;
} else {
change_event_op = ChangeEventOperation::Delete;
}

apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op)
.map_err(Into::into)
let f = |column: &SourceColumnDesc| row_op.access_field(column);
match change_event_op {
ChangeEventOperation::Upsert => writer.do_insert(f)?,
ChangeEventOperation::Delete => writer.do_delete(f)?,
}
Ok(())
}
}

Expand Down

0 comments on commit 992d083

Please sign in to comment.