Skip to content

Commit

Permalink
feat: support key encode bytes for sink (#19243) (#19308)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Co-authored-by: Bohan Zhang <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2024
1 parent b5d291a commit b78f1db
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 33 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id
rpk topic create test-rw-sink-bytes-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down
80 changes: 77 additions & 3 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ create sink invalid_pk_column from t_kafka with (

### Test sink with key encode ###

statement error sink key encode unsupported: JSON, only TEXT supported
statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand All @@ -198,8 +198,7 @@ format plain encode json (
force_append_only='true'
) key encode json ;

statement error
# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s
statement error KEY ENCODE TEXT expects only one primary key, but got 2
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -230,6 +229,54 @@ format plain encode json (
force_append_only='true'
) key encode text ;

statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_bytes_error as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode json;

statement error KEY ENCODE BYTES expects only one primary key, but got 2
create sink sink_bytes_error as (
select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes, other_bytea')
format plain encode json (
force_append_only='true'
) key encode bytes;

statement error key encode bytes only works with kafka connector, but found kinesis
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kinesis',
topic = 'topic',
properties.bootstrap.server = 'message_queue:29092'
)
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create table t_sink_text_id (id int)
include key as rw_key
Expand All @@ -239,6 +286,15 @@ with (
topic = 'test-rw-sink-text-key-id',
) format plain encode json;

statement ok
create table t_sink_bytea_id (id int)
include key as rw_key
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
) format plain encode json;

#======

statement ok
Expand Down Expand Up @@ -286,6 +342,18 @@ select rw_key from t_sink_text_id order by rw_key
\x36
\x37

query T
select rw_key from t_sink_bytea_id order by rw_key
----
\x0000000000000001
\x0000000000000002
\x0000000000000003
\x0000000000000004
\x0000000000000005
\x0000000000000006
\x0000000000000007


statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
Expand All @@ -294,3 +362,9 @@ insert into t_kafka values

statement ok
drop table t_sink_text_id;

statement ok
drop table t_sink_bytea_id;

statement ok
drop sink sink_bytes_json;
4 changes: 3 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub enum SinkEncode {
Template,
Parquet,
Text,
Bytes,
}

impl Display for SinkEncode {
Expand Down Expand Up @@ -205,6 +206,7 @@ impl SinkFormatDesc {
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
SinkEncode::Text => E::Text,
SinkEncode::Bytes => E::Bytes,
};

let encode = mapping_encode(&self.encode);
Expand Down Expand Up @@ -261,10 +263,10 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
}
};
let key_encode = match &value.key_encode() {
E::Bytes => Some(SinkEncode::Bytes),
E::Text => Some(SinkEncode::Text),
E::Unspecified => None,
encode @ (E::Avro
| E::Bytes
| E::Csv
| E::Json
| E::Protobuf
Expand Down
101 changes: 101 additions & 0 deletions src/connector/src/sink/encoder/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;

use super::RowEncoder;

pub struct BytesEncoder {
pub schema: Schema,
// the column must contain only one element
pub col_index: usize,
}

impl BytesEncoder {
pub fn new(schema: Schema, col_index: usize) -> Self {
Self { schema, col_index }
}
}

impl RowEncoder for BytesEncoder {
type Output = Vec<u8>;

fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
Some(std::slice::from_ref(&self.col_index))
}

fn encode_cols(
&self,
row: impl risingwave_common::row::Row,
col_indices: impl Iterator<Item = usize>,
) -> crate::sink::Result<Self::Output> {
// It is guaranteed by the caller that col_indices contains only one element
let mut result = Vec::new();
for col_index in col_indices {
let datum = row.datum_at(col_index);
let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Bytea {
if let Some(scalar_impl) = datum {
result = scalar_impl.into_bytea().to_vec();
} else {
result = vec![];
}
} else {
return Err(crate::sink::SinkError::Encode(format!(
"Unsupported data type: expected bytea, got {}",
data_type
)));
}
}

Ok(result)
}
}

#[cfg(test)]
mod test {
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use super::*;

#[test]
fn test_bytes_encoder_ser_bytes() {
let schema = Schema::new(vec![Field::with_name(DataType::Bytea, "col1")]);
let encoder = BytesEncoder::new(schema, 0);

let row = OwnedRow::new(vec![Some(ScalarImpl::Bytea(b"some_bytes".to_vec().into()))]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
b"some_bytes".to_vec()
);

let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
Vec::<u8>::new()
);

let schema = Schema::new(vec![Field::with_name(DataType::Int16, "col1")]);
let encoder = BytesEncoder::new(schema, 0);
let row = OwnedRow::new(vec![Some(ScalarImpl::Int16(123))]);
assert!(encoder.encode_cols(&row, std::iter::once(0)).is_err());
}
}
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::sink::Result;

mod avro;
mod bson;
pub mod bytes;
mod json;
mod proto;
pub mod template;
Expand Down
Loading

0 comments on commit b78f1db

Please sign in to comment.