Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rename ConnectorSchema/SourceSchema to FormatEncode #19174

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl TestCase {
columns,
constraints,
if_not_exists,
source_schema,
format_encode,
source_watermarks,
append_only,
on_conflict,
Expand All @@ -437,7 +437,7 @@ impl TestCase {
wildcard_idx,
..
} => {
let source_schema = source_schema.map(|schema| schema.into_v2_with_warning());
let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());

create_table::handle_create_table(
handler_args,
Expand All @@ -446,7 +446,7 @@ impl TestCase {
wildcard_idx,
constraints,
if_not_exists,
source_schema,
format_encode,
source_watermarks,
append_only,
on_conflict,
Expand Down
47 changes: 22 additions & 25 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
CompatibleSourceSchema, ConnectorSchema, CreateSourceStatement, Encode, Format, ObjectName,
CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
SqlOption, Statement,
};
use risingwave_sqlparser::parser::Parser;
Expand Down Expand Up @@ -120,7 +120,7 @@ pub fn fetch_source_catalog_with_db_schema_id(
/// and if the FORMAT and ENCODE are modified.
pub fn check_format_encode(
original_source: &SourceCatalog,
new_connector_schema: &ConnectorSchema,
new_format_encode: &FormatEncodeOptions,
) -> Result<()> {
let StreamSourceInfo {
format, row_encode, ..
Expand All @@ -137,9 +137,7 @@ pub fn check_format_encode(
.into());
};

if new_connector_schema.format != old_format
|| new_connector_schema.row_encode != old_row_encode
{
if new_format_encode.format != old_format || new_format_encode.row_encode != old_row_encode {
bail_not_implemented!(
"the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet",
&old_format,
Expand All @@ -153,19 +151,18 @@ pub fn check_format_encode(
/// Refresh the source registry and get the added/dropped columns.
pub async fn refresh_sr_and_get_columns_diff(
original_source: &SourceCatalog,
connector_schema: &ConnectorSchema,
format_encode: &FormatEncodeOptions,
session: &Arc<SessionImpl>,
) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
let mut with_properties = original_source.with_properties.clone();
validate_compatibility(connector_schema, &mut with_properties)?;
validate_compatibility(format_encode, &mut with_properties)?;

if with_properties.is_cdc_connector() {
bail_not_implemented!("altering a cdc source is not supported");
}

let (Some(columns_from_resolve_source), source_info) =
bind_columns_from_source(session, connector_schema, Either::Right(&with_properties))
.await?
bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await?
else {
// Source without schema registry is rejected.
unreachable!("source without schema registry is rejected")
Expand All @@ -189,33 +186,33 @@ pub async fn refresh_sr_and_get_columns_diff(
Ok((source_info, added_columns, dropped_columns))
}

fn get_connector_schema_from_source(source: &SourceCatalog) -> Result<ConnectorSchema> {
fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
let [stmt]: [_; 1] = Parser::parse_sql(&source.definition)
.context("unable to parse original source definition")?
.try_into()
.unwrap();
let Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
stmt: CreateSourceStatement { format_encode, .. },
} = stmt
else {
unreachable!()
};
Ok(source_schema.into_v2_with_warning())
Ok(format_encode.into_v2_with_warning())
}

pub async fn handler_refresh_schema(
handler_args: HandlerArgs,
name: ObjectName,
) -> Result<RwPgResponse> {
let (source, _, _) = fetch_source_catalog_with_db_schema_id(&handler_args.session, &name)?;
let connector_schema = get_connector_schema_from_source(&source)?;
handle_alter_source_with_sr(handler_args, name, connector_schema).await
let format_encode = get_format_encode_from_source(&source)?;
handle_alter_source_with_sr(handler_args, name, format_encode).await
}

pub async fn handle_alter_source_with_sr(
handler_args: HandlerArgs,
name: ObjectName,
connector_schema: ConnectorSchema,
format_encode: FormatEncodeOptions,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?;
Expand All @@ -232,9 +229,9 @@ pub async fn handle_alter_source_with_sr(
bail_not_implemented!(issue = 16003, "alter shared source");
}

check_format_encode(&source, &connector_schema)?;
check_format_encode(&source, &format_encode)?;

if !schema_has_schema_registry(&connector_schema) {
if !schema_has_schema_registry(&format_encode) {
return Err(ErrorCode::NotSupported(
"altering a source without schema registry".to_string(),
"try `ALTER SOURCE .. ADD COLUMN ...` instead".to_string(),
Expand All @@ -243,7 +240,7 @@ pub async fn handle_alter_source_with_sr(
}

let (source_info, added_columns, dropped_columns) =
refresh_sr_and_get_columns_diff(&source, &connector_schema, &session).await?;
refresh_sr_and_get_columns_diff(&source, &format_encode, &session).await?;

if !dropped_columns.is_empty() {
bail_not_implemented!(
Expand All @@ -258,10 +255,10 @@ pub async fn handle_alter_source_with_sr(
source.info = source_info;
source.columns.extend(added_columns);
source.definition =
alter_definition_format_encode(&source.definition, connector_schema.row_options.clone())?;
alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?;

let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
WithOptions::try_from(connector_schema.row_options())?,
WithOptions::try_from(format_encode.row_options())?,
session.as_ref(),
)?
.into_parts();
Expand Down Expand Up @@ -299,19 +296,19 @@ pub fn alter_definition_format_encode(

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
stmt: CreateSourceStatement { format_encode, .. },
}
| Statement::CreateTable {
source_schema: Some(source_schema),
format_encode: Some(format_encode),
..
} => {
match source_schema {
CompatibleSourceSchema::V2(schema) => {
match format_encode {
CompatibleFormatEncode::V2(schema) => {
schema.row_options = format_encode_options;
}
// TODO: Confirm the behavior of legacy source schema.
// Legacy source schema should be rejected by the handler and never reaches here.
CompatibleSourceSchema::RowFormat(_schema) => unreachable!(),
CompatibleFormatEncode::RowFormat(_schema) => unreachable!(),
}
}
_ => unreachable!(),
Expand Down
30 changes: 15 additions & 15 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Encode,
ObjectName, Statement, StructField,
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode,
FormatEncodeOptions, ObjectName, Statement, StructField,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -51,14 +51,14 @@ pub async fn replace_table_with_definition(
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
source_schema: Option<ConnectorSchema>,
format_encode: Option<FormatEncodeOptions>,
) -> Result<()> {
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
session,
table_name,
definition,
original_catalog,
source_schema,
format_encode,
None,
)
.await?;
Expand Down Expand Up @@ -86,15 +86,15 @@ pub async fn get_new_table_definition_for_cdc_table(
.unwrap();
let Statement::CreateTable {
columns: original_columns,
source_schema,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

assert!(
source_schema.is_none(),
format_encode.is_none(),
"source schema should be None for CDC table"
);

Expand Down Expand Up @@ -165,7 +165,7 @@ pub async fn get_replace_table_plan(
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
source_schema: Option<ConnectorSchema>,
format_encode: Option<FormatEncodeOptions>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Expand Down Expand Up @@ -196,7 +196,7 @@ pub async fn get_replace_table_plan(
session,
table_name,
original_catalog,
source_schema,
format_encode,
handler_args.clone(),
col_id_gen,
columns.clone(),
Expand Down Expand Up @@ -326,19 +326,19 @@ pub async fn handle_alter_table_column(
.unwrap();
let Statement::CreateTable {
columns,
source_schema,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};
let source_schema = source_schema
let format_encode = format_encode
.clone()
.map(|source_schema| source_schema.into_v2_with_warning());
.map(|format_encode| format_encode.into_v2_with_warning());

let fail_if_has_schema_registry = || {
if let Some(source_schema) = &source_schema
&& schema_has_schema_registry(source_schema)
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_string(),
Expand Down Expand Up @@ -460,14 +460,14 @@ pub async fn handle_alter_table_column(
table_name,
definition,
&original_catalog,
source_schema,
format_encode,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool {
pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::{anyhow, Context};
use fancy_regex::Regex;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement};
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

Expand All @@ -29,15 +29,15 @@ use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

fn get_connector_schema_from_table(table: &TableCatalog) -> Result<Option<ConnectorSchema>> {
fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
let [stmt]: [_; 1] = Parser::parse_sql(&table.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable { source_schema, .. } = stmt else {
let Statement::CreateTable { format_encode, .. } = stmt else {
unreachable!()
};
Ok(source_schema.map(|schema| schema.into_v2_with_warning()))
Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
}

pub async fn handle_refresh_schema(
Expand All @@ -51,9 +51,9 @@ pub async fn handle_refresh_schema(
bail_not_implemented!("alter table with incoming sinks");
}

let connector_schema = {
let connector_schema = get_connector_schema_from_table(&original_table)?;
if !connector_schema
let format_encode = {
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
Expand All @@ -63,12 +63,12 @@ pub async fn handle_refresh_schema(
)
.into());
}
connector_schema.unwrap()
format_encode.unwrap()
};

let definition = alter_definition_format_encode(
&original_table.definition,
connector_schema.row_options.clone(),
format_encode.row_options.clone(),
)?;

let [definition]: [_; 1] = Parser::parse_sql(&definition)
Expand All @@ -81,7 +81,7 @@ pub async fn handle_refresh_schema(
table_name,
definition,
&original_table,
Some(connector_schema),
Some(format_encode),
)
.await;

Expand Down
Loading
Loading