Skip to content

Commit

Permalink
feat: impl key encode clause for sink (#16377)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored May 8, 2024
1 parent ffb6ae5 commit 7b3cfe0
Show file tree
Hide file tree
Showing 19 changed files with 446 additions and 61 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set -euo pipefail
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down
74 changes: 72 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,61 @@ create sink invalid_pk_column from t_kafka with (
primary_key = 'id,invalid'
);

### Test sink with key encode ###

statement error sink key encode unsupported: JSON, only TEXT supported
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id')
format plain encode json (
force_append_only='true'
) key encode json ;

statement error
# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id, v_varchar')
format plain encode json (
force_append_only='true'
) key encode text ;

statement error
# The key encode is TEXT, but the primary key column v_bytea has type bytea. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, or big int.
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'v_bytea')
format plain encode json (
force_append_only='true'
) key encode text ;

statement ok
create sink sink_text_id from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id')
format plain encode json (
force_append_only='true'
) key encode text ;

statement ok
create table t_sink_text_id (id int)
include key as rw_key
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
) format plain encode json;

#======

statement ok
insert into t_kafka values
(1, '8DfUFencLe', 31031, 1872, 1872, 26261.416, 23956.39329760601, '2023-04-14 06:27:14.104742', '\x00', '0 second', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
Expand All @@ -193,7 +248,6 @@ insert into t_kafka values
(6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}');


statement error
create sink si_kafka_without_snapshot as select * from t_kafka with (
connector = 'kafka',
Expand All @@ -216,8 +270,24 @@ create sink si_kafka_without_snapshot from t_kafka with (
snapshot = 'false',
);

sleep 1s

query T
select rw_key from t_sink_text_id order by rw_key
----
\x31
\x32
\x33
\x34
\x35
\x36
\x37

statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');

statement ok
drop table t_sink_text_id;
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ drop sink si_kafka_upsert_schema;
statement ok
drop sink si_kafka_without_snapshot;

statement ok
drop sink sink_text_id;

statement ok
drop table t_kafka;
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ message SinkFormatDesc {
plan_common.FormatType format = 1;
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
optional plan_common.EncodeType key_encode = 4;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ enum EncodeType {
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
ENCODE_TYPE_NONE = 8;
ENCODE_TYPE_TEXT = 9;
}

enum RowFormatType {
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ fn mock_from_legacy_type(
format,
encode: SinkEncode::Json,
options: Default::default(),
key_encode: None,
}))
} else {
SinkFormatDesc::from_legacy_type(connector, r#type)
Expand Down
31 changes: 29 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub struct SinkFormatDesc {
pub format: SinkFormat,
pub encode: SinkEncode,
pub options: BTreeMap<String, String>,

pub key_encode: Option<SinkEncode>,
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
Expand All @@ -136,6 +138,7 @@ pub enum SinkEncode {
Protobuf,
Avro,
Template,
Text,
}

impl SinkFormatDesc {
Expand Down Expand Up @@ -166,6 +169,7 @@ impl SinkFormatDesc {
format,
encode,
options: Default::default(),
key_encode: None,
}))
}

Expand All @@ -177,12 +181,16 @@ impl SinkFormatDesc {
SinkFormat::Upsert => F::Upsert,
SinkFormat::Debezium => F::Debezium,
};
let encode = match self.encode {
let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
SinkEncode::Text => E::Text,
};

let encode = mapping_encode(&self.encode);
let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
let options = self
.options
.iter()
Expand All @@ -193,6 +201,7 @@ impl SinkFormatDesc {
format: format.into(),
encode: encode.into(),
options,
key_encode,
}
}
}
Expand Down Expand Up @@ -224,19 +233,37 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None) => {
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
)))
}
};
let key_encode = match &value.key_encode() {
E::Text => Some(SinkEncode::Text),
E::Unspecified => None,
encode @ (E::Avro
| E::Bytes
| E::Csv
| E::Json
| E::Protobuf
| E::Template
| E::Native
| E::None) => {
return Err(SinkError::Config(anyhow!(
"unsupported {} as sink key encode",
encode.as_str_name()
)))
}
};
let options = value.options.into_iter().collect();

Ok(Self {
format,
encode,
options,
key_encode,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod avro;
mod json;
mod proto;
pub mod template;
pub mod text;

pub use avro::{AvroEncoder, AvroHeader};
pub use json::JsonEncoder;
Expand Down
108 changes: 108 additions & 0 deletions src/connector/src/sink/encoder/text.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::types::{DataType, ToText};

use super::RowEncoder;

pub struct TextEncoder {
pub schema: Schema,
// the column must contain only one element
pub col_index: usize,
}

impl TextEncoder {
pub fn new(schema: Schema, col_index: usize) -> Self {
Self { schema, col_index }
}
}

impl RowEncoder for TextEncoder {
type Output = String;

fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
Some(std::slice::from_ref(&self.col_index))
}

fn encode_cols(
&self,
row: impl risingwave_common::row::Row,
col_indices: impl Iterator<Item = usize>,
) -> crate::sink::Result<Self::Output> {
// It is guaranteed by the caller that col_indices contains only one element
let mut result = String::new();
for col_index in col_indices {
let datum = row.datum_at(col_index);
let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Boolean {
result = if let Some(scalar_impl) = datum {
scalar_impl.into_bool().to_string()
} else {
"NULL".to_string()
}
} else {
result = datum.to_text_with_type(data_type);
}
}

Ok(result)
}
}

#[cfg(test)]
mod test {
use risingwave_common::catalog::Field;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use super::*;

#[test]
fn test_text_encoder_ser_bool() {
let schema = Schema::new(vec![Field::with_name(DataType::Boolean, "col1")]);
let encoder = TextEncoder::new(schema, 0);

let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(true))]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"true"
);

let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(false))]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"false"
);

let row = OwnedRow::new(vec![None]);
assert_eq!(
encoder
.encode_cols(&row, std::iter::once(0))
.unwrap()
.as_str(),
"NULL"
);
}
}
Loading

0 comments on commit 7b3cfe0

Please sign in to comment.