Skip to content

Commit

Permalink
fix: revert #13278 & #13390 for include syntax (#13785)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Dec 4, 2023
1 parent f33a5d0 commit e41b62f
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 231 deletions.
20 changes: 10 additions & 10 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,17 @@ create table s27 with (
) FORMAT PLAIN ENCODE JSON (schema.location = 'file:///risingwave/json-complex-schema')

# currently _rw_key can be set as primary key
statement ok
create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE BYTES
# statement ok
# create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
# connector = 'kafka',
# topic = 'kafka_source_format_bytes',
# properties.bootstrap.server = 'message_queue:29092',
# scan.startup.mode = 'earliest'
# ) FORMAT PLAIN ENCODE BYTES

# throttle option
statement ok
create table s29 (id bytea, PRIMARY KEY(_rw_key)) with (
create table s29 (id bytea) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -858,8 +858,8 @@ drop source s24
statement ok
drop table s27

statement ok
drop table s28
# statement ok
# drop table s28

statement ok
drop table s29
Expand Down
140 changes: 63 additions & 77 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod test {
use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer};
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::catalog::{ColumnId, DEFAULT_KEY_COLUMN_NAME};
use risingwave_common::catalog::ColumnId;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
Expand All @@ -221,12 +221,10 @@ mod test {
AvroParserConfig,
};
use crate::common::AwsAuthProps;
use crate::parser::bytes_parser::BytesAccessBuilder;
use crate::parser::plain_parser::PlainParser;
use crate::parser::unified::avro::unix_epoch_days;
use crate::parser::{
AccessBuilderImpl, BytesProperties, EncodingProperties, EncodingType,
SourceStreamChunkBuilder, SpecificParserConfig,
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceColumnDesc;

Expand Down Expand Up @@ -307,11 +305,6 @@ mod test {
let conf = new_avro_conf_from_local(file_name).await?;

Ok(PlainParser {
key_builder: AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?),
payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new(
conf,
EncodingType::Value,
Expand All @@ -335,75 +328,68 @@ mod test {
let flush = writer.flush().unwrap();
assert!(flush > 0);
let input_data = writer.into_inner().unwrap();
// _rw_key test cases
let key_testcases = vec![Some(br#"r"#.to_vec()), Some(vec![]), None];
let columns = build_rw_columns();
for key_data in key_testcases {
let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 1);
{
let writer = builder.row_writer();
parser
.parse_inner(key_data, Some(input_data.clone()), writer)
.await
.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
{
let writer = builder.row_writer();
parser.parse_inner(input_data, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
}
}
Expand Down
19 changes: 6 additions & 13 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ mod tests {
SourceStreamChunkBuilder, SpecificParserConfig,
};

type Item = (Vec<u8>, Vec<u8>);
fn get_item() -> Vec<Item> {
vec![
(br#"a"#.to_vec(), br#"t"#.to_vec()),
(br#"r"#.to_vec(), br#"random"#.to_vec()),
]
fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
}

async fn test_bytes_parser(get_item: fn() -> Vec<Item>) {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
Expand All @@ -76,12 +72,9 @@ mod tests {

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

for item in get_item() {
for payload in get_payload() {
let writer = builder.row_writer();
parser
.parse_inner(Some(item.0), Some(item.1), writer)
.await
.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand All @@ -107,6 +100,6 @@ mod tests {

#[tokio::test]
async fn test_bytes_parse_object_top_level() {
test_bytes_parser(get_item).await;
test_bytes_parser(get_payload).await;
}
}
40 changes: 8 additions & 32 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};

use super::bytes_parser::BytesAccessBuilder;
use super::unified::util::apply_key_val_accessor_on_stream_chunk_writer;
use super::unified::util::apply_row_accessor_on_stream_chunk_writer;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::only_parse_payload;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
pub struct PlainParser {
pub key_builder: AccessBuilderImpl,
pub payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
pub source_ctx: SourceContextRef,
Expand All @@ -39,11 +37,6 @@ impl PlainParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?);
let payload_builder = match props.encoding_config {
EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_)
Expand All @@ -57,7 +50,6 @@ impl PlainParser {
}
};
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
Expand All @@ -66,28 +58,12 @@ impl PlainParser {

pub async fn parse_inner(
&mut self,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
// if key is empty, set it as vec![]
let key_data = key.unwrap_or_default();
// if payload is empty, report error
let payload_data = payload.ok_or_else(|| {
RwError::from(ErrorCode::InternalError(
"Empty payload with nonempty key".into(),
))
})?;
let accessor = self.payload_builder.generate_accessor(payload).await?;

let key_accessor = self.key_builder.generate_accessor(key_data).await?;
let payload_accessor = self.payload_builder.generate_accessor(payload_data).await?;
apply_key_val_accessor_on_stream_chunk_writer(
DEFAULT_KEY_COLUMN_NAME,
key_accessor,
payload_accessor,
&mut writer,
)
.map_err(Into::into)
apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into)
}
}

Expand All @@ -106,10 +82,10 @@ impl ByteStreamSourceParser for PlainParser {

async fn parse_one<'a>(
&'a mut self,
key: Option<Vec<u8>>,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<()> {
self.parse_inner(key, payload, writer).await
only_parse_payload!(self, payload, writer)
}
}
18 changes: 1 addition & 17 deletions src/connector/src/parser/unified/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::error::{ErrorCode, RwError};

use super::{Access, AccessError, AccessImpl, AccessResult, ChangeEvent};
use super::{Access, AccessError, AccessResult, ChangeEvent};
use crate::parser::unified::ChangeEventOperation;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::SourceColumnDesc;
Expand Down Expand Up @@ -46,22 +46,6 @@ pub fn apply_row_accessor_on_stream_chunk_writer(
writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
}

pub fn apply_key_val_accessor_on_stream_chunk_writer(
key_column_name: &str,
key_accessor: AccessImpl<'_, '_>,
val_accessor: AccessImpl<'_, '_>,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> AccessResult<()> {
let f = |column: &SourceColumnDesc| {
if column.name == key_column_name {
key_accessor.access(&[&column.name], Some(&column.data_type))
} else {
val_accessor.access(&[&column.name], Some(&column.data_type))
}
};
writer.insert(f)
}

impl From<AccessError> for RwError {
fn from(val: AccessError) -> Self {
ErrorCode::InternalError(format!("AccessError: {:?}", val)).into()
Expand Down
16 changes: 0 additions & 16 deletions src/frontend/planner_test/tests/testdata/input/batch_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,3 @@
expected_outputs:
- batch_plan
- logical_plan
- sql: |
insert into s values (1,2, E'\\xDEADBEEF'::bytea);
create_table_with_connector:
format: plain
encode: protobuf
name: s
file: |
syntax = "proto3";
package test;
message TestRecord {
int32 id = 1;
int32 value = 2;
}
expected_outputs:
- batch_plan
- logical_plan
Loading

0 comments on commit e41b62f

Please sign in to comment.