Skip to content

Commit

Permalink
use more cow
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 3eaf2fb commit 53177c7
Show file tree
Hide file tree
Showing 22 changed files with 119 additions and 103 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/common/src/types/cow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ use super::{Datum, DatumRef, ToDatumRef, ToOwnedDatum};
///
/// We do not use [`std::borrow::Cow`] because it requires the borrowed variant
/// to be a reference, whereas what we have is a [`DatumRef`] with a lifetime.
///
/// # Usage
///
/// Generally, you don't need to match on the variants of `DatumCow` to access
/// the underlying datum. Instead, you can...
///
/// - call [`to_datum_ref`](ToDatumRef::to_datum_ref) to get a borrowed
/// [`DatumRef`] without any allocation, which can be used to append to an
/// array builder or to encode into the storage representation,
///
/// - call [`to_owned_datum`](ToOwnedDatum::to_owned_datum) to get an owned
/// [`Datum`] with potentially an allocation, which can be used to store in a
/// struct without lifetime constraints.
#[derive(Debug, Clone)]
pub enum DatumCow<'a> {
Borrowed(DatumRef<'a>),
Expand Down
3 changes: 1 addition & 2 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type))
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
1 change: 1 addition & 0 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
num-bigint = "0.4"
Expand Down
23 changes: 11 additions & 12 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
DataType, Date, DatumCow, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -266,23 +266,19 @@ impl<'a> AvroParseOptions<'a> {
}
}

// TODO: No need to use two lifetimes here.
pub struct AvroAccess<'a, 'b> {
pub struct AvroAccess<'a> {
value: &'a Value,
options: AvroParseOptions<'b>,
options: AvroParseOptions<'a>,
}

impl<'a, 'b> AvroAccess<'a, 'b> {
pub fn new(value: &'a Value, options: AvroParseOptions<'b>) -> Self {
impl<'a> AvroAccess<'a> {
pub fn new(value: &'a Value, options: AvroParseOptions<'a>) -> Self {
Self { value, options }
}
}

impl<'a, 'b> Access for AvroAccess<'a, 'b>
where
'a: 'b,
{
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
impl Access for AvroAccess<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
let mut value = self.value;
let mut options: AvroParseOptions<'_> = self.options.clone();

Expand Down Expand Up @@ -312,7 +308,10 @@ where
Err(create_error())?;
}

options.convert_to_datum(value, type_expected)
// TODO: may borrow the value directly
options
.convert_to_datum(value, type_expected)
.map(Into::into)
}
}

Expand Down
35 changes: 19 additions & 16 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,36 @@ pub enum AccessError {
pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;

/// Access to a field in the data structure.
///
/// Only one of these two methods should be implemented. See documentation for more details.
pub trait Access {
/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
/// `type_expected` might or might not be used during the conversion depending on the implementation.
///
/// # Path
///
/// We usually expect the data is a record (struct), and `path` represents field path.
/// The data (or part of the data) represents the whole row (`Vec<Datum>`),
/// and we use different `path` to access one column at a time.
///
/// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
///
/// # Returns
///
/// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef)
/// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields
/// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through
/// [`DatumCow::Owned`].
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>>;
}

/// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation.
/// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`].
// Note: made an extension trait to disallow implementing or overriding `access_owned`.
#[easy_ext::ext(AccessExt)]
impl<A: Access> A {
/// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details.
///
/// This should be preferred over `access` for both callers and implementors.
// TODO: implement this method in all parsers and remove `access` method.
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
self.access(path, type_expected).map(Into::into)
/// Always prefer calling `access` directly if possible to avoid unnecessary allocation.
pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct AvroAccessBuilder {
}

impl AccessBuilder for AvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct BytesAccessBuilder {

impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(
&self.column_name,
payload,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct DebeziumAvroAccessBuilder {

// TODO: reduce encodingtype match
impl AccessBuilder for DebeziumAvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
let schema = self.schema_resolver.get_by_id(schema_id).await?;
self.value = Some(from_avro_datum(schema.as_ref(), &mut raw_payload, None)?);
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl DebeziumJsonAccessBuilder {

impl AccessBuilder for DebeziumJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down Expand Up @@ -79,7 +79,7 @@ impl DebeziumMongoJsonAccessBuilder {

impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct JsonAccessBuilder {

impl AccessBuilder for JsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
// XXX: When will we enter this branch?
if payload.is_empty() {
self.value = Some("{}".into());
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}

pub trait AccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>>;
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -887,10 +887,7 @@ impl AccessBuilderImpl {
Ok(accessor)
}

pub async fn generate_accessor(
&mut self,
payload: Vec<u8>,
) -> ConnectorResult<AccessImpl<'_, '_>> {
pub async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let accessor = match self {
Self::Avro(builder) => builder.generate_accessor(payload).await?,
Self::Protobuf(builder) => builder.generate_accessor(payload).await?,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl PlainParser {
};
}

let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();
let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ProtobufAccessBuilder {

impl AccessBuilder for ProtobufAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let payload = if self.confluent_wire_type {
resolve_pb_header(&payload)?
} else {
Expand Down Expand Up @@ -583,6 +583,7 @@ mod test {

use prost::Message;
use risingwave_common::types::StructType;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::data::data_type::PbTypeName;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
Expand All @@ -591,7 +592,6 @@ mod test {
use super::*;
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
use crate::parser::protobuf::recursive::AllTypes;
use crate::parser::unified::Access;
use crate::parser::SpecificParserConfig;

fn schema_dir() -> String {
Expand Down Expand Up @@ -896,7 +896,7 @@ mod test {

fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) {
let dummy_type = DataType::Varchar;
let d = a.access(&[field_name], &dummy_type).unwrap().unwrap();
let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap();
assert_eq!(d, value, "field: {} value: {:?}", field_name, d);
}

Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/parser/unified/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl};

use super::{Access, AccessError, AccessResult};

Expand All @@ -29,14 +29,16 @@ impl<'a> BytesAccess<'a> {
}
}

impl<'a> Access for BytesAccess<'a> {
impl Access for BytesAccess<'_> {
/// path is empty currently, `type_expected` should be `Bytea`
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
if let DataType::Bytea = type_expected {
if self.column_name.is_none()
|| (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0])
{
return Ok(Some(ScalarImpl::Bytea(Box::from(self.bytes.as_slice()))));
return Ok(DatumCow::Borrowed(Some(ScalarRefImpl::Bytea(
self.bytes.as_slice(),
))));
}
return Err(AccessError::Undefined {
name: path[0].to_string(),
Expand Down
Loading

0 comments on commit 53177c7

Please sign in to comment.