Skip to content

Commit

Permalink
refactor(sqlparser): rename SourceSchemaV2/SinkSchema to `Connect…
Browse files Browse the repository at this point in the history
…orSchema` (#13065)
  • Loading branch information
xiangjinwu authored Oct 26, 2023
1 parent e74b32d commit 41d2bbd
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 99 deletions.
10 changes: 5 additions & 5 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement,
AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -32,7 +32,7 @@ use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::handler::create_table::gen_create_table_plan_with_source;
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};
use crate::{build_graph, Binder, OptimizerContext, TableCatalog, WithOptions};

/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
/// `DropColumn`.
Expand Down Expand Up @@ -262,12 +262,12 @@ pub async fn handle_alter_table_column(
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool {
fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
let mut options = schema.gen_options().unwrap();
matches!(get_json_schema_location(&mut options), Ok(Some(_)))
let mut options = WithOptions::try_from(schema.row_options()).unwrap();
matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
}
_ => false,
}
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_connector::sink::{
};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, Select,
SelectItem, SetExpr, SinkSchema, TableFactor, TableWithJoins,
ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query,
Select, SelectItem, SetExpr, TableFactor, TableWithJoins,
};

use super::create_mv::get_column_names;
Expand Down Expand Up @@ -228,7 +228,7 @@ pub async fn handle_create_sink(
/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
/// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`]
/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
fn bind_sink_format_desc(value: SinkSchema) -> Result<SinkFormatDesc> {
fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
use risingwave_sqlparser::ast::{Encode as E, Format as F};

Expand Down Expand Up @@ -288,7 +288,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
),
))
});
pub fn validate_compatibility(connector: &str, format_desc: &SinkSchema) -> Result<()> {
pub fn validate_compatibility(connector: &str, format_desc: &ConnectorSchema) -> Result<()> {
let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
.get(connector)
.ok_or_else(|| {
Expand Down
15 changes: 7 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::LazyLock;

use anyhow::anyhow;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
Expand Down Expand Up @@ -46,8 +45,8 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, CreateSourceStatement,
DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceSchemaV2, SourceWatermark,
self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, ConnectorSchema,
CreateSourceStatement, DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark,
};

use super::RwPgResponse;
Expand Down Expand Up @@ -278,7 +277,7 @@ fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Opti
/// resolve the schema of the source from external schema file, return the relation's columns. see <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
/// return `(columns, pk_names, source info)`
pub(crate) async fn try_bind_columns_from_source(
source_schema: &SourceSchemaV2,
source_schema: &ConnectorSchema,
sql_defined_pk_names: Vec<String>,
sql_defined_columns: &[ColumnDef],
with_properties: &HashMap<String, String>,
Expand All @@ -290,7 +289,7 @@ pub(crate) async fn try_bind_columns_from_source(
let sql_defined_pk = !sql_defined_pk_names.is_empty();
let sql_defined_schema = !sql_defined_columns.is_empty();
let is_kafka: bool = is_kafka_connector(with_properties);
let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?;
let mut options = WithOptions::try_from(source_schema.row_options())?.into_inner();

let get_key_message_name = |options: &mut BTreeMap<String, String>| -> Option<String> {
consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
Expand Down Expand Up @@ -904,7 +903,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
});

pub fn validate_compatibility(
source_schema: &SourceSchemaV2,
source_schema: &ConnectorSchema,
props: &mut HashMap<String, String>,
) -> Result<()> {
let connector = get_connector(props)
Expand All @@ -922,8 +921,8 @@ pub fn validate_compatibility(
if connector != KAFKA_CONNECTOR {
let res = match (&source_schema.format, &source_schema.row_encode) {
(Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?;
let (_, use_schema_registry) = get_schema_location(&mut options)?;
let mut options = WithOptions::try_from(source_schema.row_options())?;
let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
use_schema_registry
}
(Format::Debezium, Encode::Avro) => true,
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
ColumnDef, ColumnOption, DataType as AstDataType, Format, ObjectName, SourceSchemaV2,
ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, ObjectName,
SourceWatermark, TableConstraint,
};

Expand Down Expand Up @@ -436,7 +436,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
source_schema: SourceSchemaV2,
source_schema: ConnectorSchema,
source_watermarks: Vec<SourceWatermark>,
mut col_id_gen: ColumnIdGenerator,
append_only: bool,
Expand Down Expand Up @@ -749,7 +749,7 @@ pub async fn handle_create_table(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<SourceSchemaV2>,
source_schema: Option<ConnectorSchema>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
notice: Option<String>,
Expand Down Expand Up @@ -826,8 +826,8 @@ pub async fn handle_create_table(

pub fn check_create_table_with_source(
with_options: &WithOptions,
source_schema: Option<SourceSchemaV2>,
) -> Result<Option<SourceSchemaV2>> {
source_schema: Option<ConnectorSchema>,
) -> Result<Option<ConnectorSchema>> {
if with_options.inner().contains_key(UPSTREAM_SOURCE_KEY) {
source_schema.as_ref().ok_or_else(|| {
ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
Expand Down
94 changes: 20 additions & 74 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use core::fmt;
use std::collections::BTreeMap;
use std::fmt::Write;

use itertools::Itertools;
Expand Down Expand Up @@ -105,7 +104,7 @@ pub enum SourceSchema {
}

impl SourceSchema {
pub fn into_source_schema_v2(self) -> SourceSchemaV2 {
pub fn into_source_schema_v2(self) -> ConnectorSchema {
let (format, row_encode) = match self {
SourceSchema::Protobuf(_) => (Format::Plain, Encode::Protobuf),
SourceSchema::Json => (Format::Plain, Encode::Json),
Expand Down Expand Up @@ -205,7 +204,7 @@ impl SourceSchema {
_ => vec![],
};

SourceSchemaV2 {
ConnectorSchema {
format,
row_encode,
row_options,
Expand Down Expand Up @@ -336,7 +335,7 @@ impl Encode {

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SourceSchemaV2 {
pub struct ConnectorSchema {
pub format: Format,
pub row_encode: Encode,
pub row_options: Vec<SqlOption>,
Expand All @@ -346,7 +345,7 @@ pub struct SourceSchemaV2 {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CompatibleSourceSchema {
RowFormat(SourceSchema),
V2(SourceSchemaV2),
V2(ConnectorSchema),
}

impl fmt::Display for CompatibleSourceSchema {
Expand All @@ -363,7 +362,7 @@ impl fmt::Display for CompatibleSourceSchema {
}

impl CompatibleSourceSchema {
pub fn into_source_schema_v2(self) -> (SourceSchemaV2, Option<String>) {
pub fn into_source_schema_v2(self) -> (ConnectorSchema, Option<String>) {
match self {
CompatibleSourceSchema::RowFormat(inner) => (
inner.into_source_schema_v2(),
Expand All @@ -373,29 +372,15 @@ impl CompatibleSourceSchema {
}
}

impl From<SourceSchemaV2> for CompatibleSourceSchema {
fn from(value: SourceSchemaV2) -> Self {
impl From<ConnectorSchema> for CompatibleSourceSchema {
fn from(value: ConnectorSchema) -> Self {
Self::V2(value)
}
}

fn parse_source_schema(p: &mut Parser) -> Result<CompatibleSourceSchema, ParserError> {
if p.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) {
p.expect_keyword(Keyword::FORMAT)?;
let id = p.parse_identifier()?;
let s = id.value.to_ascii_uppercase();
let format = Format::from_keyword(&s)?;
p.expect_keyword(Keyword::ENCODE)?;
let id = p.parse_identifier()?;
let s = id.value.to_ascii_uppercase();
let row_encode = Encode::from_keyword(&s)?;
let row_options = p.parse_options()?;

Ok(CompatibleSourceSchema::V2(SourceSchemaV2 {
format,
row_encode,
row_options,
}))
if let Some(schema_v2) = p.parse_schema()? {
Ok(CompatibleSourceSchema::V2(schema_v2))
} else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
&& p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
{
Expand Down Expand Up @@ -465,7 +450,7 @@ impl Parser {
// row format for nexmark source must be native
// default row format for datagen source is native
if connector.contains("-cdc") {
let expected = SourceSchemaV2::debezium_json();
let expected = ConnectorSchema::debezium_json();
if self.peek_source_schema_format() {
let schema = parse_source_schema(self)?.into_source_schema_v2().0;
if schema != expected {
Expand All @@ -477,7 +462,7 @@ impl Parser {
}
Ok(expected.into())
} else if connector.contains("nexmark") {
let expected = SourceSchemaV2::native();
let expected = ConnectorSchema::native();
if self.peek_source_schema_format() {
let schema = parse_source_schema(self)?.into_source_schema_v2().0;
if schema != expected {
Expand All @@ -492,17 +477,15 @@ impl Parser {
Ok(if self.peek_source_schema_format() {
parse_source_schema(self)?
} else {
SourceSchemaV2::native().into()
ConnectorSchema::native().into()
})
} else {
Ok(parse_source_schema(self)?)
}
}

/// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SINK`.
///
/// TODO: After [`SourceSchemaV2`] and [`SinkSchema`] merge, call this in [`parse_source_schema`].
pub fn parse_schema(&mut self) -> Result<Option<SinkSchema>, ParserError> {
/// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SOURCE` and `CREATE SINK`.
pub fn parse_schema(&mut self) -> Result<Option<ConnectorSchema>, ParserError> {
if !self.parse_keyword(Keyword::FORMAT) {
return Ok(None);
}
Expand All @@ -516,18 +499,18 @@ impl Parser {
let row_encode = Encode::from_keyword(&s)?;
let row_options = self.parse_options()?;

Ok(Some(SinkSchema {
Ok(Some(ConnectorSchema {
format,
row_encode,
row_options,
}))
}
}

impl SourceSchemaV2 {
impl ConnectorSchema {
/// Create a new source schema with `Debezium` format and `Json` encoding.
pub const fn debezium_json() -> Self {
SourceSchemaV2 {
ConnectorSchema {
format: Format::Debezium,
row_encode: Encode::Json,
row_options: Vec::new(),
Expand All @@ -536,35 +519,19 @@ impl SourceSchemaV2 {

/// Create a new source schema with `Native` format and encoding.
pub const fn native() -> Self {
SourceSchemaV2 {
ConnectorSchema {
format: Format::Native,
row_encode: Encode::Native,
row_options: Vec::new(),
}
}

pub fn gen_options(&self) -> Result<BTreeMap<String, String>, ParserError> {
self.row_options
.iter()
.cloned()
.map(|x| match x.value {
Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)),
Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)),
Value::Number(n) => Ok((x.name.real_value(), n)),
Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())),
_ => Err(ParserError::ParserError(
"`row format options` only support single quoted string value and C style escaped string".to_owned(),
)),
})
.try_collect()
}

pub fn row_options(&self) -> &[SqlOption] {
self.row_options.as_ref()
}
}

impl fmt::Display for SourceSchemaV2 {
impl fmt::Display for ConnectorSchema {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;

Expand Down Expand Up @@ -823,27 +790,6 @@ impl fmt::Display for CreateSink {
}
}

/// Same as [`SourceSchemaV2`]. Will be merged in a dedicated rename PR.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SinkSchema {
pub format: Format,
pub row_encode: Encode,
pub row_options: Vec<SqlOption>,
}

impl fmt::Display for SinkSchema {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;

if !self.row_options.is_empty() {
write!(f, " ({})", display_comma_separated(&self.row_options))
} else {
Ok(())
}
}
}

// sql_grammar!(CreateSinkStatement {
// if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
// sink_name: Ident,
Expand All @@ -860,7 +806,7 @@ pub struct CreateSinkStatement {
pub sink_from: CreateSink,
pub columns: Vec<Ident>,
pub emit_mode: Option<EmitMode>,
pub sink_schema: Option<SinkSchema>,
pub sink_schema: Option<ConnectorSchema>,
}

impl ParseTo for CreateSinkStatement {
Expand Down
Loading

0 comments on commit 41d2bbd

Please sign in to comment.