Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: revert #13278 & #13390 for include syntax (#13785) #13788

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,17 @@ create table s27 with (
) FORMAT PLAIN ENCODE JSON (schema.location = 'file:///risingwave/json-complex-schema')

# currently _rw_key can be set as primary key
statement ok
create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE BYTES
# statement ok
# create table s28 (id bytea, PRIMARY KEY(_rw_key)) with (
# connector = 'kafka',
# topic = 'kafka_source_format_bytes',
# properties.bootstrap.server = 'message_queue:29092',
# scan.startup.mode = 'earliest'
# ) FORMAT PLAIN ENCODE BYTES

# throttle option
statement ok
create table s29 (id bytea, PRIMARY KEY(_rw_key)) with (
create table s29 (id bytea) with (
connector = 'kafka',
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -858,8 +858,8 @@ drop source s24
statement ok
drop table s27

statement ok
drop table s28
# statement ok
# drop table s28

statement ok
drop table s29
Expand Down
140 changes: 63 additions & 77 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod test {
use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer};
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::catalog::{ColumnId, DEFAULT_KEY_COLUMN_NAME};
use risingwave_common::catalog::ColumnId;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
Expand All @@ -221,12 +221,10 @@ mod test {
AvroParserConfig,
};
use crate::common::AwsAuthProps;
use crate::parser::bytes_parser::BytesAccessBuilder;
use crate::parser::plain_parser::PlainParser;
use crate::parser::unified::avro::unix_epoch_days;
use crate::parser::{
AccessBuilderImpl, BytesProperties, EncodingProperties, EncodingType,
SourceStreamChunkBuilder, SpecificParserConfig,
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceColumnDesc;

Expand Down Expand Up @@ -307,11 +305,6 @@ mod test {
let conf = new_avro_conf_from_local(file_name).await?;

Ok(PlainParser {
key_builder: AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?),
payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new(
conf,
EncodingType::Value,
Expand All @@ -335,75 +328,68 @@ mod test {
let flush = writer.flush().unwrap();
assert!(flush > 0);
let input_data = writer.into_inner().unwrap();
// _rw_key test cases
let key_testcases = vec![Some(br#"r"#.to_vec()), Some(vec![]), None];
let columns = build_rw_columns();
for key_data in key_testcases {
let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 1);
{
let writer = builder.row_writer();
parser
.parse_inner(key_data, Some(input_data.clone()), writer)
.await
.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
{
let writer = builder.row_writer();
parser.parse_inner(input_data, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
}
}
Expand Down
19 changes: 6 additions & 13 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ mod tests {
SourceStreamChunkBuilder, SpecificParserConfig,
};

type Item = (Vec<u8>, Vec<u8>);
fn get_item() -> Vec<Item> {
vec![
(br#"a"#.to_vec(), br#"t"#.to_vec()),
(br#"r"#.to_vec(), br#"random"#.to_vec()),
]
fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
}

async fn test_bytes_parser(get_item: fn() -> Vec<Item>) {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
Expand All @@ -76,12 +72,9 @@ mod tests {

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

for item in get_item() {
for payload in get_payload() {
let writer = builder.row_writer();
parser
.parse_inner(Some(item.0), Some(item.1), writer)
.await
.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand All @@ -107,6 +100,6 @@ mod tests {

#[tokio::test]
async fn test_bytes_parse_object_top_level() {
test_bytes_parser(get_item).await;
test_bytes_parser(get_payload).await;
}
}
40 changes: 8 additions & 32 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};

use super::bytes_parser::BytesAccessBuilder;
use super::unified::util::apply_key_val_accessor_on_stream_chunk_writer;
use super::unified::util::apply_row_accessor_on_stream_chunk_writer;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::only_parse_payload;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
pub struct PlainParser {
pub key_builder: AccessBuilderImpl,
pub payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
pub source_ctx: SourceContextRef,
Expand All @@ -39,11 +37,6 @@ impl PlainParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?);
let payload_builder = match props.encoding_config {
EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_)
Expand All @@ -57,7 +50,6 @@ impl PlainParser {
}
};
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
Expand All @@ -66,28 +58,12 @@ impl PlainParser {

pub async fn parse_inner(
&mut self,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
// if key is empty, set it as vec![]
let key_data = key.unwrap_or_default();
// if payload is empty, report error
let payload_data = payload.ok_or_else(|| {
RwError::from(ErrorCode::InternalError(
"Empty payload with nonempty key".into(),
))
})?;
let accessor = self.payload_builder.generate_accessor(payload).await?;

let key_accessor = self.key_builder.generate_accessor(key_data).await?;
let payload_accessor = self.payload_builder.generate_accessor(payload_data).await?;
apply_key_val_accessor_on_stream_chunk_writer(
DEFAULT_KEY_COLUMN_NAME,
key_accessor,
payload_accessor,
&mut writer,
)
.map_err(Into::into)
apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into)
}
}

Expand All @@ -106,10 +82,10 @@ impl ByteStreamSourceParser for PlainParser {

async fn parse_one<'a>(
&'a mut self,
key: Option<Vec<u8>>,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<()> {
self.parse_inner(key, payload, writer).await
only_parse_payload!(self, payload, writer)
}
}
18 changes: 1 addition & 17 deletions src/connector/src/parser/unified/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use risingwave_common::error::{ErrorCode, RwError};

use super::{Access, AccessError, AccessImpl, AccessResult, ChangeEvent};
use super::{Access, AccessError, AccessResult, ChangeEvent};
use crate::parser::unified::ChangeEventOperation;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::SourceColumnDesc;
Expand Down Expand Up @@ -46,22 +46,6 @@ pub fn apply_row_accessor_on_stream_chunk_writer(
writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
}

pub fn apply_key_val_accessor_on_stream_chunk_writer(
key_column_name: &str,
key_accessor: AccessImpl<'_, '_>,
val_accessor: AccessImpl<'_, '_>,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> AccessResult<()> {
let f = |column: &SourceColumnDesc| {
if column.name == key_column_name {
key_accessor.access(&[&column.name], Some(&column.data_type))
} else {
val_accessor.access(&[&column.name], Some(&column.data_type))
}
};
writer.insert(f)
}

impl From<AccessError> for RwError {
fn from(val: AccessError) -> Self {
ErrorCode::InternalError(format!("AccessError: {:?}", val)).into()
Expand Down
16 changes: 0 additions & 16 deletions src/frontend/planner_test/tests/testdata/input/batch_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,3 @@
expected_outputs:
- batch_plan
- logical_plan
- sql: |
insert into s values (1,2, E'\\xDEADBEEF'::bytea);
create_table_with_connector:
format: plain
encode: protobuf
name: s
file: |
syntax = "proto3";
package test;
message TestRecord {
int32 id = 1;
int32 value = 2;
}
expected_outputs:
- batch_plan
- logical_plan
Loading