Skip to content

Commit

Permalink
encode bytes support
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 4, 2024
1 parent 271faac commit bdbba4d
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 19 deletions.
108 changes: 108 additions & 0 deletions src/connector/src/sink/encoder/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;

use super::RowEncoder;

/// Encode with [`ToBytes`]. Only used to encode key.
pub struct BytesEncoder {
pub schema: Schema,
// the column must contain only one element
pub col_index: usize,
}

impl BytesEncoder {
pub fn new(schema: Schema, col_index: usize) -> Self {
Self { schema, col_index }
}
}

impl RowEncoder for BytesEncoder {
type Output = Vec<u8>;

fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
Some(std::slice::from_ref(&self.col_index))
}

fn encode_cols(
&self,
row: impl risingwave_common::row::Row,
col_indices: impl Iterator<Item = usize>,
) -> crate::sink::Result<Self::Output> {
// It is guaranteed by the caller that col_indices contains only one element
let mut result = Vec::new();
for col_index in col_indices {
let datum = row.datum_at(col_index);
let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Bytea {
if let Some(scalar_impl) = datum {
result = scalar_impl.into_bytea().to_vec();
} else {
result = vec![];
}
} else {
return Err(crate::sink::SinkError::Encode(format!(
"Unsupported data type: expected bytea, got {}",
data_type
)));
}
}

Ok(result)
}
}

#[cfg(test)]
mod test {
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use super::*;

#[test]
fn test_bytes_encoder_ser_bytes() {
let schema = Schema::new(vec![Field::with_name(DataType::Bytea, "col1")]);
let encoder = BytesEncoder::new(schema, 0);

let row = OwnedRow::new(vec![Some(ScalarImpl::Bytea(b"some_bytes".to_vec().into()))]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
b"some_bytes".to_vec()
);

let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
Vec::<u8>::new()
);

let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
Vec::<u8>::new()
);

let schema = Schema::new(vec![Field::with_name(DataType::Int16, "col1")]);
let encoder = BytesEncoder::new(schema, 0);
let row = OwnedRow::new(vec![Some(ScalarImpl::Int16(123))]);
assert!(encoder.encode_cols(&row, std::iter::once(0)).is_err());
}
}
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::sink::Result;

mod avro;
mod bson;
pub mod bytes;
mod json;
mod proto;
pub mod template;
Expand Down
85 changes: 66 additions & 19 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::{anyhow, Context};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Field;

use crate::sink::{Result, SinkError};

Expand All @@ -28,6 +29,7 @@ use risingwave_common::types::DataType;
pub use upsert::UpsertFormatter;

use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::bytes::BytesEncoder;
use super::encoder::template::TemplateEncoder;
use super::encoder::text::TextEncoder;
use super::encoder::{
Expand Down Expand Up @@ -75,21 +77,27 @@ pub enum SinkFormatterImpl {
// append-only
AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
// upsert
UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
// `UpsertFormatter<ProtoEncoder, ProtoEncoder>` is intentionally left out
// to avoid using `ProtoEncoder` as key:
// <https://docs.confluent.io/platform/7.7/control-center/topics/schema.html#c3-schemas-best-practices-key-value-pairs>
UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
// debezium
Expand Down Expand Up @@ -168,27 +176,55 @@ impl EncoderBuild for ProtoEncoder {
}
}

impl EncoderBuild for TextEncoder {
fn ensure_only_one_pk<'a>(
data_type_name: &'a str,
params: &'a EncoderParams<'_>,
pk_indices: &'a Option<Vec<usize>>,
) -> Result<(usize, &'a Field)> {
let Some(pk_indices) = pk_indices else {
return Err(SinkError::Config(anyhow!(
"{}Encoder requires primary key columns to be specified",
data_type_name
)));
};
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
"The key encode is {}, but the primary key has {} columns. The key encode {} requires the primary key to be a single column",
data_type_name,
pk_indices.len(),
data_type_name
)));
}

let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
SinkError::Config(anyhow!(
"The primary key column index {} is out of bounds in schema {:?}",
pk_indices[0],
params.schema
))
})?;

Ok((pk_indices[0], schema_ref))
}

impl EncoderBuild for BytesEncoder {
async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let Some(pk_indices) = pk_indices else {
return Err(SinkError::Config(anyhow!(
"TextEncoder requires primary key columns to be specified"
)));
};
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
"The key encode is TEXT, but the primary key has {} columns. The key encode TEXT requires the primary key to be a single column",
pk_indices.len()
)));
let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", &params, &pk_indices)?;
if let DataType::Bytea = schema_ref.data_type() {
Ok(BytesEncoder::new(params.schema, pk_index))
} else {
Err(SinkError::Config(anyhow!(
"The key encode is BYTES, but the primary key column {} has type {}",
schema_ref.name,
schema_ref.data_type
)))
}
}
}

let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
SinkError::Config(anyhow!(
"The primary key column index {} is out of bounds in schema {:?}",
pk_indices[0],
params.schema
))
})?;
impl EncoderBuild for TextEncoder {
async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", &params, &pk_indices)?;
match &schema_ref.data_type() {
DataType::Varchar
| DataType::Boolean
Expand All @@ -209,7 +245,7 @@ impl EncoderBuild for TextEncoder {
}
}

Ok(Self::new(params.schema, pk_indices[0]))
Ok(Self::new(params.schema, pk_index))
}
}

Expand Down Expand Up @@ -401,17 +437,23 @@ macro_rules! dispatch_sink_formatter_impl {
($impl:expr, $name:ident, $body:expr) => {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro($name) => $body,
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,

SinkFormatterImpl::UpsertJson($name) => $body,
SinkFormatterImpl::UpsertBytesJson($name) => $body,
SinkFormatterImpl::UpsertTextJson($name) => $body,
SinkFormatterImpl::UpsertAvro($name) => $body,
SinkFormatterImpl::UpsertTextAvro($name) => $body,
SinkFormatterImpl::UpsertBytesAvro($name) => $body,
SinkFormatterImpl::UpsertTextProto($name) => $body,
SinkFormatterImpl::UpsertBytesProto($name) => $body,
SinkFormatterImpl::DebeziumJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
Expand All @@ -426,17 +468,22 @@ macro_rules! dispatch_sink_formatter_str_key_impl {
($impl:expr, $name:ident, $body:expr) => {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,

SinkFormatterImpl::UpsertJson($name) => $body,
SinkFormatterImpl::UpsertTextJson($name) => $body,
SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
SinkFormatterImpl::UpsertTextAvro($name) => $body,
SinkFormatterImpl::UpsertBytesAvro($name) => $body,
SinkFormatterImpl::UpsertTextProto($name) => $body,
SinkFormatterImpl::UpsertBytesProto($name) => $body,
SinkFormatterImpl::DebeziumJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
Expand Down

0 comments on commit bdbba4d

Please sign in to comment.