Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support key encode bytes for sink (#19243) #19308

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id
rpk topic create test-rw-sink-bytes-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down
80 changes: 77 additions & 3 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ create sink invalid_pk_column from t_kafka with (

### Test sink with key encode ###

statement error sink key encode unsupported: JSON, only TEXT supported
statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand All @@ -198,8 +198,7 @@ format plain encode json (
force_append_only='true'
) key encode json ;

statement error
# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s
statement error KEY ENCODE TEXT expects only one primary key, but got 2
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -230,6 +229,54 @@ format plain encode json (
force_append_only='true'
) key encode text ;

statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_bytes_error as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode json;

statement error KEY ENCODE BYTES expects only one primary key, but got 2
create sink sink_bytes_error as (
select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes, other_bytea')
format plain encode json (
force_append_only='true'
) key encode bytes;

statement error key encode bytes only works with kafka connector, but found kinesis
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kinesis',
topic = 'topic',
properties.bootstrap.server = 'message_queue:29092'
)
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create table t_sink_text_id (id int)
include key as rw_key
Expand All @@ -239,6 +286,15 @@ with (
topic = 'test-rw-sink-text-key-id',
) format plain encode json;

statement ok
create table t_sink_bytea_id (id int)
include key as rw_key
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
) format plain encode json;

#======

statement ok
Expand Down Expand Up @@ -286,6 +342,18 @@ select rw_key from t_sink_text_id order by rw_key
\x36
\x37

query T
select rw_key from t_sink_bytea_id order by rw_key
----
\x0000000000000001
\x0000000000000002
\x0000000000000003
\x0000000000000004
\x0000000000000005
\x0000000000000006
\x0000000000000007


statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
Expand All @@ -294,3 +362,9 @@ insert into t_kafka values

statement ok
drop table t_sink_text_id;

statement ok
drop table t_sink_bytea_id;

statement ok
drop sink sink_bytes_json;
4 changes: 3 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub enum SinkEncode {
Template,
Parquet,
Text,
Bytes,
}

impl Display for SinkEncode {
Expand Down Expand Up @@ -205,6 +206,7 @@ impl SinkFormatDesc {
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
SinkEncode::Text => E::Text,
SinkEncode::Bytes => E::Bytes,
};

let encode = mapping_encode(&self.encode);
Expand Down Expand Up @@ -261,10 +263,10 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
}
};
let key_encode = match &value.key_encode() {
E::Bytes => Some(SinkEncode::Bytes),
E::Text => Some(SinkEncode::Text),
E::Unspecified => None,
encode @ (E::Avro
| E::Bytes
| E::Csv
| E::Json
| E::Protobuf
Expand Down
101 changes: 101 additions & 0 deletions src/connector/src/sink/encoder/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;

use super::RowEncoder;

pub struct BytesEncoder {
pub schema: Schema,
// the column must contain only one element
pub col_index: usize,
}

impl BytesEncoder {
pub fn new(schema: Schema, col_index: usize) -> Self {
Self { schema, col_index }
}
}

impl RowEncoder for BytesEncoder {
type Output = Vec<u8>;

fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
Some(std::slice::from_ref(&self.col_index))
}

fn encode_cols(
&self,
row: impl risingwave_common::row::Row,
col_indices: impl Iterator<Item = usize>,
) -> crate::sink::Result<Self::Output> {
// It is guaranteed by the caller that col_indices contains only one element
let mut result = Vec::new();
for col_index in col_indices {
let datum = row.datum_at(col_index);
let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Bytea {
if let Some(scalar_impl) = datum {
result = scalar_impl.into_bytea().to_vec();
} else {
result = vec![];
}
} else {
return Err(crate::sink::SinkError::Encode(format!(
"Unsupported data type: expected bytea, got {}",
data_type
)));
}
}

Ok(result)
}
}

#[cfg(test)]
mod test {
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use super::*;

#[test]
fn test_bytes_encoder_ser_bytes() {
let schema = Schema::new(vec![Field::with_name(DataType::Bytea, "col1")]);
let encoder = BytesEncoder::new(schema, 0);

let row = OwnedRow::new(vec![Some(ScalarImpl::Bytea(b"some_bytes".to_vec().into()))]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
b"some_bytes".to_vec()
);

let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder.encode_cols(&row, std::iter::once(0)).unwrap(),
Vec::<u8>::new()
);

let schema = Schema::new(vec![Field::with_name(DataType::Int16, "col1")]);
let encoder = BytesEncoder::new(schema, 0);
let row = OwnedRow::new(vec![Some(ScalarImpl::Int16(123))]);
assert!(encoder.encode_cols(&row, std::iter::once(0)).is_err());
}
}
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::sink::Result;

mod avro;
mod bson;
pub mod bytes;
mod json;
mod proto;
pub mod template;
Expand Down
Loading