Skip to content

Commit

Permalink
feat: set default message queue key to format plain (#13278)
Browse files Browse the repository at this point in the history
Co-authored-by: jiamin.huang <[email protected]>
  • Loading branch information
keven-huang and jiamin.huang authored Nov 10, 2023
1 parent 8004505 commit 4c78170
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 120 deletions.
12 changes: 12 additions & 0 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ create table s27 with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (schema.location = 'file:///risingwave/json-complex-schema')

# currently _rw_key can be set as primary key
statement ok
create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE BYTES

statement ok
CREATE TABLE mongo_customers (
_id BIGINT PRIMARY KEY,
Expand Down Expand Up @@ -830,6 +839,9 @@ drop source s24
statement ok
drop table s27

statement ok
drop table s28

statement ok
DROP TABLE mongo_customers;

Expand Down
140 changes: 77 additions & 63 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod test {
use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer};
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::catalog::ColumnId;
use risingwave_common::catalog::{ColumnId, DEFAULT_KEY_COLUMN_NAME};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
Expand All @@ -220,10 +220,12 @@ mod test {
AvroParserConfig,
};
use crate::aws_auth::AwsAuthProps;
use crate::parser::bytes_parser::BytesAccessBuilder;
use crate::parser::plain_parser::PlainParser;
use crate::parser::unified::avro::unix_epoch_days;
use crate::parser::{
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
AccessBuilderImpl, BytesProperties, EncodingProperties, EncodingType,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceColumnDesc;

Expand Down Expand Up @@ -309,6 +311,11 @@ mod test {
let conf = new_avro_conf_from_local(file_name).await?;

Ok(PlainParser {
key_builder: AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?),
payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new(
conf,
EncodingType::Value,
Expand All @@ -332,68 +339,75 @@ mod test {
let flush = writer.flush().unwrap();
assert!(flush > 0);
let input_data = writer.into_inner().unwrap();
// _rw_key test cases
let key_testcases = vec![Some(br#"r"#.to_vec()), Some(vec![]), None];
let columns = build_rw_columns();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
{
let writer = builder.row_writer();
parser.parse_inner(input_data, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
for key_data in key_testcases {
let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 1);
{
let writer = builder.row_writer();
parser
.parse_inner(key_data, Some(input_data.clone()), writer)
.await
.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ mod tests {
SourceStreamChunkBuilder, SpecificParserConfig,
};

fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
type Item = (Vec<u8>, Vec<u8>);
fn get_item() -> Vec<Item> {
vec![
(br#"a"#.to_vec(), br#"t"#.to_vec()),
(br#"r"#.to_vec(), br#"random"#.to_vec()),
]
}

async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
async fn test_bytes_parser(get_item: fn() -> Vec<Item>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
Expand All @@ -72,9 +76,12 @@ mod tests {

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

for payload in get_payload() {
for item in get_item() {
let writer = builder.row_writer();
parser.parse_inner(payload, writer).await.unwrap();
parser
.parse_inner(Some(item.0), Some(item.1), writer)
.await
.unwrap();
}

let chunk = builder.finish();
Expand All @@ -100,6 +107,6 @@ mod tests {

#[tokio::test]
async fn test_bytes_parse_object_top_level() {
test_bytes_parser(get_payload).await;
test_bytes_parser(get_item).await;
}
}
40 changes: 32 additions & 8 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};

use super::unified::util::apply_row_accessor_on_stream_chunk_writer;
use super::bytes_parser::BytesAccessBuilder;
use super::unified::util::apply_key_val_accessor_on_stream_chunk_writer;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::only_parse_payload;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
pub struct PlainParser {
pub key_builder: AccessBuilderImpl,
pub payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
pub source_ctx: SourceContextRef,
Expand All @@ -37,6 +39,11 @@ impl PlainParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?);
let payload_builder = match props.encoding_config {
EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_)
Expand All @@ -50,6 +57,7 @@ impl PlainParser {
}
};
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
Expand All @@ -58,12 +66,28 @@ impl PlainParser {

pub async fn parse_inner(
&mut self,
payload: Vec<u8>,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
let accessor = self.payload_builder.generate_accessor(payload).await?;
// if key is empty, set it as vec![]su
let key_data = key.unwrap_or_default();
// if payload is empty, report error
let payload_data = payload.ok_or_else(|| {
RwError::from(ErrorCode::InternalError(
"Empty payload with nonempty key".into(),
))
})?;

apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into)
let key_accessor = self.key_builder.generate_accessor(key_data).await?;
let payload_accessor = self.payload_builder.generate_accessor(payload_data).await?;
apply_key_val_accessor_on_stream_chunk_writer(
DEFAULT_KEY_COLUMN_NAME,
key_accessor,
payload_accessor,
&mut writer,
)
.map_err(Into::into)
}
}

Expand All @@ -82,10 +106,10 @@ impl ByteStreamSourceParser for PlainParser {

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<()> {
only_parse_payload!(self, payload, writer)
self.parse_inner(key, payload, writer).await
}
}
18 changes: 17 additions & 1 deletion src/connector/src/parser/unified/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::error::{ErrorCode, RwError};

use super::{Access, AccessError, AccessResult, ChangeEvent};
use super::{Access, AccessError, AccessImpl, AccessResult, ChangeEvent};
use crate::parser::unified::ChangeEventOperation;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::SourceColumnDesc;
Expand Down Expand Up @@ -46,6 +46,22 @@ pub fn apply_row_accessor_on_stream_chunk_writer(
writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
}

pub fn apply_key_val_accessor_on_stream_chunk_writer(
key_column_name: &str,
key_accessor: AccessImpl<'_, '_>,
val_accessor: AccessImpl<'_, '_>,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> AccessResult<()> {
let f = |column: &SourceColumnDesc| {
if column.name == key_column_name {
key_accessor.access(&[&column.name], Some(&column.data_type))
} else {
val_accessor.access(&[&column.name], Some(&column.data_type))
}
};
writer.insert(f)
}

impl From<AccessError> for RwError {
fn from(val: AccessError) -> Self {
ErrorCode::InternalError(format!("AccessError: {:?}", val)).into()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |-
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) }
└─LogicalSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], filter: (None, None) }
create_source:
format: plain
encode: protobuf
Expand Down
Loading

0 comments on commit 4c78170

Please sign in to comment.