Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sqlparser): rename SourceSchemaV2/SinkSchema to ConnectorSchema #13065

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement,
AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -32,7 +32,7 @@ use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::handler::create_table::gen_create_table_plan_with_source;
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};
use crate::{build_graph, Binder, OptimizerContext, TableCatalog, WithOptions};

/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
/// `DropColumn`.
Expand Down Expand Up @@ -262,12 +262,12 @@ pub async fn handle_alter_table_column(
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool {
fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
let mut options = schema.gen_options().unwrap();
matches!(get_json_schema_location(&mut options), Ok(Some(_)))
let mut options = WithOptions::try_from(schema.row_options()).unwrap();
matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
}
_ => false,
}
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_connector::sink::{
};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, Select,
SelectItem, SetExpr, SinkSchema, TableFactor, TableWithJoins,
ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query,
Select, SelectItem, SetExpr, TableFactor, TableWithJoins,
};

use super::create_mv::get_column_names;
Expand Down Expand Up @@ -228,7 +228,7 @@ pub async fn handle_create_sink(
/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
/// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`]
/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
fn bind_sink_format_desc(value: SinkSchema) -> Result<SinkFormatDesc> {
fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
use risingwave_sqlparser::ast::{Encode as E, Format as F};

Expand Down Expand Up @@ -288,7 +288,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
),
))
});
pub fn validate_compatibility(connector: &str, format_desc: &SinkSchema) -> Result<()> {
pub fn validate_compatibility(connector: &str, format_desc: &ConnectorSchema) -> Result<()> {
let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
.get(connector)
.ok_or_else(|| {
Expand Down
15 changes: 7 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::LazyLock;

use anyhow::anyhow;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
Expand Down Expand Up @@ -46,8 +45,8 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, CreateSourceStatement,
DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceSchemaV2, SourceWatermark,
self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, ConnectorSchema,
CreateSourceStatement, DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark,
};

use super::RwPgResponse;
Expand Down Expand Up @@ -278,7 +277,7 @@ fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Opti
/// resolve the schema of the source from external schema file, return the relation's columns. see <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
/// return `(columns, pk_names, source info)`
pub(crate) async fn try_bind_columns_from_source(
source_schema: &SourceSchemaV2,
source_schema: &ConnectorSchema,
sql_defined_pk_names: Vec<String>,
sql_defined_columns: &[ColumnDef],
with_properties: &HashMap<String, String>,
Expand All @@ -290,7 +289,7 @@ pub(crate) async fn try_bind_columns_from_source(
let sql_defined_pk = !sql_defined_pk_names.is_empty();
let sql_defined_schema = !sql_defined_columns.is_empty();
let is_kafka: bool = is_kafka_connector(with_properties);
let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?;
let mut options = WithOptions::try_from(source_schema.row_options())?.into_inner();

let get_key_message_name = |options: &mut BTreeMap<String, String>| -> Option<String> {
consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
Expand Down Expand Up @@ -904,7 +903,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
});

pub fn validate_compatibility(
source_schema: &SourceSchemaV2,
source_schema: &ConnectorSchema,
props: &mut HashMap<String, String>,
) -> Result<()> {
let connector = get_connector(props)
Expand All @@ -922,8 +921,8 @@ pub fn validate_compatibility(
if connector != KAFKA_CONNECTOR {
let res = match (&source_schema.format, &source_schema.row_encode) {
(Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?;
let (_, use_schema_registry) = get_schema_location(&mut options)?;
let mut options = WithOptions::try_from(source_schema.row_options())?;
let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
use_schema_registry
}
(Format::Debezium, Encode::Avro) => true,
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
ColumnDef, ColumnOption, DataType as AstDataType, Format, ObjectName, SourceSchemaV2,
ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, ObjectName,
SourceWatermark, TableConstraint,
};

Expand Down Expand Up @@ -436,7 +436,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
source_schema: SourceSchemaV2,
source_schema: ConnectorSchema,
source_watermarks: Vec<SourceWatermark>,
mut col_id_gen: ColumnIdGenerator,
append_only: bool,
Expand Down Expand Up @@ -749,7 +749,7 @@ pub async fn handle_create_table(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<SourceSchemaV2>,
source_schema: Option<ConnectorSchema>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
notice: Option<String>,
Expand Down Expand Up @@ -826,8 +826,8 @@ pub async fn handle_create_table(

pub fn check_create_table_with_source(
with_options: &WithOptions,
source_schema: Option<SourceSchemaV2>,
) -> Result<Option<SourceSchemaV2>> {
source_schema: Option<ConnectorSchema>,
) -> Result<Option<ConnectorSchema>> {
if with_options.inner().contains_key(UPSTREAM_SOURCE_KEY) {
source_schema.as_ref().ok_or_else(|| {
ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
Expand Down
94 changes: 20 additions & 74 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use core::fmt;
use std::collections::BTreeMap;
use std::fmt::Write;

use itertools::Itertools;
Expand Down Expand Up @@ -105,7 +104,7 @@ pub enum SourceSchema {
}

impl SourceSchema {
pub fn into_source_schema_v2(self) -> SourceSchemaV2 {
pub fn into_source_schema_v2(self) -> ConnectorSchema {
let (format, row_encode) = match self {
SourceSchema::Protobuf(_) => (Format::Plain, Encode::Protobuf),
SourceSchema::Json => (Format::Plain, Encode::Json),
Expand Down Expand Up @@ -205,7 +204,7 @@ impl SourceSchema {
_ => vec![],
};

SourceSchemaV2 {
ConnectorSchema {
format,
row_encode,
row_options,
Expand Down Expand Up @@ -336,7 +335,7 @@ impl Encode {

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SourceSchemaV2 {
pub struct ConnectorSchema {
pub format: Format,
pub row_encode: Encode,
pub row_options: Vec<SqlOption>,
Expand All @@ -346,7 +345,7 @@ pub struct SourceSchemaV2 {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CompatibleSourceSchema {
RowFormat(SourceSchema),
V2(SourceSchemaV2),
V2(ConnectorSchema),
}

impl fmt::Display for CompatibleSourceSchema {
Expand All @@ -363,7 +362,7 @@ impl fmt::Display for CompatibleSourceSchema {
}

impl CompatibleSourceSchema {
pub fn into_source_schema_v2(self) -> (SourceSchemaV2, Option<String>) {
pub fn into_source_schema_v2(self) -> (ConnectorSchema, Option<String>) {
match self {
CompatibleSourceSchema::RowFormat(inner) => (
inner.into_source_schema_v2(),
Expand All @@ -373,29 +372,15 @@ impl CompatibleSourceSchema {
}
}

impl From<SourceSchemaV2> for CompatibleSourceSchema {
fn from(value: SourceSchemaV2) -> Self {
impl From<ConnectorSchema> for CompatibleSourceSchema {
fn from(value: ConnectorSchema) -> Self {
Self::V2(value)
}
}

fn parse_source_schema(p: &mut Parser) -> Result<CompatibleSourceSchema, ParserError> {
if p.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) {
p.expect_keyword(Keyword::FORMAT)?;
let id = p.parse_identifier()?;
let s = id.value.to_ascii_uppercase();
let format = Format::from_keyword(&s)?;
p.expect_keyword(Keyword::ENCODE)?;
let id = p.parse_identifier()?;
let s = id.value.to_ascii_uppercase();
let row_encode = Encode::from_keyword(&s)?;
let row_options = p.parse_options()?;

Ok(CompatibleSourceSchema::V2(SourceSchemaV2 {
format,
row_encode,
row_options,
}))
if let Some(schema_v2) = p.parse_schema()? {
Ok(CompatibleSourceSchema::V2(schema_v2))
} else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
&& p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
{
Expand Down Expand Up @@ -465,7 +450,7 @@ impl Parser {
// row format for nexmark source must be native
// default row format for datagen source is native
if connector.contains("-cdc") {
let expected = SourceSchemaV2::debezium_json();
let expected = ConnectorSchema::debezium_json();
if self.peek_source_schema_format() {
let schema = parse_source_schema(self)?.into_source_schema_v2().0;
if schema != expected {
Expand All @@ -477,7 +462,7 @@ impl Parser {
}
Ok(expected.into())
} else if connector.contains("nexmark") {
let expected = SourceSchemaV2::native();
let expected = ConnectorSchema::native();
if self.peek_source_schema_format() {
let schema = parse_source_schema(self)?.into_source_schema_v2().0;
if schema != expected {
Expand All @@ -492,17 +477,15 @@ impl Parser {
Ok(if self.peek_source_schema_format() {
parse_source_schema(self)?
} else {
SourceSchemaV2::native().into()
ConnectorSchema::native().into()
})
} else {
Ok(parse_source_schema(self)?)
}
}

/// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SINK`.
///
/// TODO: After [`SourceSchemaV2`] and [`SinkSchema`] merge, call this in [`parse_source_schema`].
pub fn parse_schema(&mut self) -> Result<Option<SinkSchema>, ParserError> {
/// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SOURCE` and `CREATE SINK`.
pub fn parse_schema(&mut self) -> Result<Option<ConnectorSchema>, ParserError> {
if !self.parse_keyword(Keyword::FORMAT) {
return Ok(None);
}
Expand All @@ -516,18 +499,18 @@ impl Parser {
let row_encode = Encode::from_keyword(&s)?;
let row_options = self.parse_options()?;

Ok(Some(SinkSchema {
Ok(Some(ConnectorSchema {
format,
row_encode,
row_options,
}))
}
}

impl SourceSchemaV2 {
impl ConnectorSchema {
/// Create a new source schema with `Debezium` format and `Json` encoding.
pub const fn debezium_json() -> Self {
SourceSchemaV2 {
ConnectorSchema {
format: Format::Debezium,
row_encode: Encode::Json,
row_options: Vec::new(),
Expand All @@ -536,35 +519,19 @@ impl SourceSchemaV2 {

/// Create a new source schema with `Native` format and encoding.
pub const fn native() -> Self {
SourceSchemaV2 {
ConnectorSchema {
format: Format::Native,
row_encode: Encode::Native,
row_options: Vec::new(),
}
}

pub fn gen_options(&self) -> Result<BTreeMap<String, String>, ParserError> {
self.row_options
.iter()
.cloned()
.map(|x| match x.value {
Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)),
Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)),
Value::Number(n) => Ok((x.name.real_value(), n)),
Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())),
_ => Err(ParserError::ParserError(
"`row format options` only support single quoted string value and C style escaped string".to_owned(),
)),
})
.try_collect()
}

pub fn row_options(&self) -> &[SqlOption] {
self.row_options.as_ref()
}
}

impl fmt::Display for SourceSchemaV2 {
impl fmt::Display for ConnectorSchema {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;

Expand Down Expand Up @@ -823,27 +790,6 @@ impl fmt::Display for CreateSink {
}
}

/// Same as [`SourceSchemaV2`]. Will be merged in a dedicated rename PR.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SinkSchema {
pub format: Format,
pub row_encode: Encode,
pub row_options: Vec<SqlOption>,
}

impl fmt::Display for SinkSchema {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;

if !self.row_options.is_empty() {
write!(f, " ({})", display_comma_separated(&self.row_options))
} else {
Ok(())
}
}
}

// sql_grammar!(CreateSinkStatement {
// if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
// sink_name: Ident,
Expand All @@ -860,7 +806,7 @@ pub struct CreateSinkStatement {
pub sink_from: CreateSink,
pub columns: Vec<Ident>,
pub emit_mode: Option<EmitMode>,
pub sink_schema: Option<SinkSchema>,
pub sink_schema: Option<ConnectorSchema>,
}

impl ParseTo for CreateSinkStatement {
Expand Down
Loading