Skip to content

Commit

Permalink
refactor(source): handle INCLUDE header data type properly (#18821)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Oct 16, 2024
1 parent 6e775bd commit b7f9da4
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 67 deletions.
51 changes: 14 additions & 37 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use std::sync::LazyLock;
use risingwave_common::bail;
use risingwave_common::catalog::{max_column_id, ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::{DataType, StructType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as PbDataType;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
Expand Down Expand Up @@ -118,13 +116,14 @@ pub fn gen_default_addition_col_name(
connector_name: &str,
additional_col_type: &str,
inner_field_name: Option<&str>,
data_type: Option<&str>,
data_type: Option<&DataType>,
) -> String {
let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
let col_name = [
Some(connector_name),
Some(additional_col_type),
inner_field_name,
data_type,
legacy_dt_name.as_deref(),
];
col_name.iter().fold("_rw".to_string(), |name, ele| {
if let Some(ele) = ele {
Expand All @@ -141,7 +140,7 @@ pub fn build_additional_column_desc(
additional_col_type: &str,
column_alias: Option<String>,
inner_field_name: Option<&str>,
data_type: Option<&str>,
data_type: Option<&DataType>,
reject_unknown_connector: bool,
is_cdc_backfill_table: bool,
) -> ConnectorResult<ColumnDesc> {
Expand Down Expand Up @@ -352,42 +351,15 @@ fn build_header_catalog(
column_id: ColumnId,
col_name: &str,
inner_field_name: Option<&str>,
data_type: Option<&str>,
data_type: Option<&DataType>,
) -> ColumnDesc {
if let Some(inner) = inner_field_name {
let (data_type, pb_data_type) = {
if let Some(type_name) = data_type {
match type_name {
"bytea" => (
DataType::Bytea,
PbDataType {
type_name: TypeName::Bytea as i32,
..Default::default()
},
),
"varchar" => (
DataType::Varchar,
PbDataType {
type_name: TypeName::Varchar as i32,
..Default::default()
},
),
_ => unreachable!(),
}
} else {
(
DataType::Bytea,
PbDataType {
type_name: TypeName::Bytea as i32,
..Default::default()
},
)
}
};
let data_type = data_type.unwrap_or(&DataType::Bytea);
let pb_data_type = data_type.to_protobuf();
ColumnDesc::named_with_additional_column(
col_name,
column_id,
data_type,
data_type.clone(),
AdditionalColumn {
column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
inner_field: inner.to_string(),
Expand Down Expand Up @@ -427,7 +399,12 @@ mod test {
"_rw_kafka_header_inner"
);
assert_eq!(
gen_default_addition_col_name("kafka", "header", Some("inner"), Some("varchar")),
gen_default_addition_col_name(
"kafka",
"header",
Some("inner"),
Some(&DataType::Varchar)
),
"_rw_kafka_header_inner_varchar"
);
}
Expand Down
14 changes: 11 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,16 +655,24 @@ pub fn handle_addition_columns(
while let Some(item) = additional_columns.pop() {
check_additional_column_compatibility(&item, source_schema)?;

let data_type_name: Option<String> = item
let data_type = item
.header_inner_expect_type
.map(|dt| format!("{:?}", dt).to_lowercase());
.map(|dt| bind_data_type(&dt))
.transpose()?;
if let Some(dt) = &data_type
&& !matches!(dt, DataType::Bytea | DataType::Varchar)
{
return Err(
ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(),
);
}
let col = build_additional_column_desc(
latest_col_id.next(),
connector_name.as_str(),
item.column_type.real_value().as_str(),
item.column_alias.map(|alias| alias.real_value()),
item.inner_field.as_deref(),
data_type_name.as_deref(),
data_type.as_ref(),
true,
is_cdc_backfill_table,
)?;
Expand Down
10 changes: 1 addition & 9 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2183,15 +2183,7 @@ impl Display for IncludeOptionItem {
if let Some(inner_field) = inner_field {
write!(f, " '{}'", value::escape_single_quote_string(inner_field))?;
if let Some(expected_type) = header_inner_expect_type {
write!(
f,
" {}",
match expected_type {
DataType::Varchar => "varchar",
DataType::Bytea => "bytea",
t => unreachable!("unparse header expected type: {t}"),
}
)?;
write!(f, " {}", expected_type)?;
}
}
if let Some(alias) = column_alias {
Expand Down
40 changes: 24 additions & 16 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2634,22 +2634,30 @@ impl Parser<'_> {
self.next_token();
column_inner_field = Some(inner_field);

if let Token::Word(w) = self.peek_token().token {
match w.keyword {
Keyword::BYTEA => {
header_inner_expect_type = Some(DataType::Bytea);
self.next_token();
}
Keyword::VARCHAR => {
header_inner_expect_type = Some(DataType::Varchar);
self.next_token();
}
_ => {
// default to bytea
header_inner_expect_type = Some(DataType::Bytea);
}
}
}
// `verify` rejects `DataType::Custom` so that a following `INCLUDE` (or even `WITH`)
// will not be misrecognized as a DataType.
//
// For example, the following look structurally the same because `INCLUDE` is not a
// reserved keyword. (`AS` is reserved.)
// * `INCLUDE header 'foo' varchar`
// * `INCLUDE header 'foo' INCLUDE`
//
// To be honest `bytea` shall be a `DataType::Custom` rather than a keyword, and the
// logic here shall be:
// ```
// match dt {
// DataType::Custom(name) => allowed.contains(name.real_value()),
// _ => true,
// }
// ```
// An allowlist is better than a denylist, as the following token may be other than
// `INCLUDE` or `WITH` in the future.
//
// If this sounds too complicated - it means we should have designed this extension
// syntax differently to make ambiguity handling easier.
header_inner_expect_type =
opt(parser_v2::data_type.verify(|dt| !matches!(dt, DataType::Custom(_))))
.parse_next(self)?;
}

let mut column_alias = None;
Expand Down
9 changes: 7 additions & 2 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,24 @@
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE SOURCE s (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
formatted_sql: CREATE SOURCE s (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' AS foo_bytea INCLUDE header 'foo' CHARACTER VARYING AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: |-
CREATE TABLE t
(raw BYTEA)
INCLUDE header AS all_headers
INCLUDE header 'foo' AS foo_bytea, -- tolerate extra comma due to previous bug #18800
INCLUDE header 'foo' VARCHAR AS foo_str
INCLUDE header
INCLUDE header 'foo'
INCLUDE header 'foo' VARCHAR
INCLUDE header 'foo' bytea
INCLUDE header 'bar'
WITH (
connector = 'kafka',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' AS foo_bytea INCLUDE header 'foo' CHARACTER VARYING AS foo_str INCLUDE header INCLUDE header 'foo' INCLUDE header 'foo' CHARACTER VARYING INCLUDE header 'foo' BYTEA INCLUDE header 'bar' WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT, v3 STRUCT<v1 INT, v2 INT>>)
Expand Down

0 comments on commit b7f9da4

Please sign in to comment.