Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): migrate to zero-copy access implementation #17165

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/common/src/types/cow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ use super::{Datum, DatumRef, ToDatumRef, ToOwnedDatum};
///
/// We do not use [`std::borrow::Cow`] because it requires the borrowed variant
/// to be a reference, whereas what we have is a [`DatumRef`] with a lifetime.
///
/// # Usage
///
/// Generally, you don't need to match on the variants of `DatumCow` to access
/// the underlying datum. Instead, you can...
///
/// - call [`to_datum_ref`](ToDatumRef::to_datum_ref) to get a borrowed
/// [`DatumRef`] without any allocation, which can be used to append to an
/// array builder or to encode into the storage representation,
///
/// - call [`to_owned_datum`](ToOwnedDatum::to_owned_datum) to get an owned
/// [`Datum`] with potentially an allocation, which can be used to store in a
/// struct without lifetime constraints.
#[derive(Debug, Clone)]
pub enum DatumCow<'a> {
Borrowed(DatumRef<'a>),
Expand Down
3 changes: 1 addition & 2 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type))
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
1 change: 1 addition & 0 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
num-bigint = "0.4"
Expand Down
23 changes: 11 additions & 12 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
DataType, Date, DatumCow, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -266,23 +266,19 @@ impl<'a> AvroParseOptions<'a> {
}
}

// TODO: No need to use two lifetimes here.
pub struct AvroAccess<'a, 'b> {
pub struct AvroAccess<'a> {
value: &'a Value,
options: AvroParseOptions<'b>,
options: AvroParseOptions<'a>,
}

impl<'a, 'b> AvroAccess<'a, 'b> {
pub fn new(value: &'a Value, options: AvroParseOptions<'b>) -> Self {
impl<'a> AvroAccess<'a> {
pub fn new(value: &'a Value, options: AvroParseOptions<'a>) -> Self {
Self { value, options }
}
}

impl<'a, 'b> Access for AvroAccess<'a, 'b>
where
'a: 'b,
{
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
impl Access for AvroAccess<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
let mut value = self.value;
let mut options: AvroParseOptions<'_> = self.options.clone();

Expand Down Expand Up @@ -312,7 +308,10 @@ where
Err(create_error())?;
}

options.convert_to_datum(value, type_expected)
// TODO: may borrow the value directly
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in #17171.

options
.convert_to_datum(value, type_expected)
.map(Into::into)
}
}

Expand Down
35 changes: 19 additions & 16 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,36 @@ pub enum AccessError {
pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;

/// Access to a field in the data structure.
///
/// Only one of these two methods should be implemented. See documentation for more details.
pub trait Access {
/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
/// `type_expected` might or might not be used during the conversion depending on the implementation.
///
/// # Path
///
/// We usually expect the data is a record (struct), and `path` represents field path.
/// The data (or part of the data) represents the whole row (`Vec<Datum>`),
/// and we use different `path` to access one column at a time.
///
/// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
///
/// # Returns
///
/// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef)
/// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields
/// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through
/// [`DatumCow::Owned`].
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>>;
}

/// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation.
/// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`].
// Note: made an extension trait to disallow implementing or overriding `access_owned`.
#[easy_ext::ext(AccessExt)]
impl<A: Access> A {
/// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details.
///
/// This should be preferred over `access` for both callers and implementors.
// TODO: implement this method in all parsers and remove `access` method.
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
self.access(path, type_expected).map(Into::into)
/// Always prefer calling `access` directly if possible to avoid unnecessary allocation.
pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct AvroAccessBuilder {
}

impl AccessBuilder for AvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct BytesAccessBuilder {

impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(
&self.column_name,
payload,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct DebeziumAvroAccessBuilder {

// TODO: reduce encodingtype match
impl AccessBuilder for DebeziumAvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
let schema = self.schema_resolver.get_by_id(schema_id).await?;
self.value = Some(from_avro_datum(schema.as_ref(), &mut raw_payload, None)?);
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl DebeziumJsonAccessBuilder {

impl AccessBuilder for DebeziumJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down Expand Up @@ -79,7 +79,7 @@ impl DebeziumMongoJsonAccessBuilder {

impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct JsonAccessBuilder {

impl AccessBuilder for JsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
// XXX: When will we enter this branch?
if payload.is_empty() {
self.value = Some("{}".into());
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}

pub trait AccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>>;
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -887,10 +887,7 @@ impl AccessBuilderImpl {
Ok(accessor)
}

pub async fn generate_accessor(
&mut self,
payload: Vec<u8>,
) -> ConnectorResult<AccessImpl<'_, '_>> {
pub async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let accessor = match self {
Self::Avro(builder) => builder.generate_accessor(payload).await?,
Self::Protobuf(builder) => builder.generate_accessor(payload).await?,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl PlainParser {
};
}

let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();
let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ProtobufAccessBuilder {

impl AccessBuilder for ProtobufAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let payload = if self.confluent_wire_type {
resolve_pb_header(&payload)?
} else {
Expand Down Expand Up @@ -583,6 +583,7 @@ mod test {

use prost::Message;
use risingwave_common::types::StructType;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::data::data_type::PbTypeName;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
Expand All @@ -591,7 +592,6 @@ mod test {
use super::*;
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
use crate::parser::protobuf::recursive::AllTypes;
use crate::parser::unified::Access;
use crate::parser::SpecificParserConfig;

fn schema_dir() -> String {
Expand Down Expand Up @@ -896,7 +896,7 @@ mod test {

fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) {
let dummy_type = DataType::Varchar;
let d = a.access(&[field_name], &dummy_type).unwrap().unwrap();
let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap();
assert_eq!(d, value, "field: {} value: {:?}", field_name, d);
}

Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/parser/unified/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl};

use super::{Access, AccessError, AccessResult};

Expand All @@ -29,14 +29,16 @@ impl<'a> BytesAccess<'a> {
}
}

impl<'a> Access for BytesAccess<'a> {
impl Access for BytesAccess<'_> {
/// path is empty currently, `type_expected` should be `Bytea`
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
if let DataType::Bytea = type_expected {
if self.column_name.is_none()
|| (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0])
{
return Ok(Some(ScalarImpl::Bytea(Box::from(self.bytes.as_slice()))));
return Ok(DatumCow::Borrowed(Some(ScalarRefImpl::Bytea(
self.bytes.as_slice(),
))));
}
return Err(AccessError::Undefined {
name: path[0].to_string(),
Expand Down
Loading
Loading