Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: some CDC table's code #19255

Merged
merged 12 commits into from
Nov 8, 2024
88 changes: 43 additions & 45 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode,
FormatEncodeOptions, ObjectName, Statement, StructField,
FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -43,34 +43,10 @@ use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::{Binder, TableCatalog, WithOptions};

pub async fn replace_table_with_definition(
session: &Arc<SessionImpl>,
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
format_encode: Option<FormatEncodeOptions>,
) -> Result<()> {
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
session,
table_name,
definition,
original_catalog,
format_encode,
None,
)
.await?;

let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(())
}

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
Expand All @@ -84,9 +60,11 @@ pub async fn get_new_table_definition_for_cdc_table(
.context("unable to parse original table definition")?
.try_into()
.unwrap();

let Statement::CreateTable {
columns: original_columns,
format_encode,
constraints,
..
} = &mut definition
else {
Expand All @@ -98,6 +76,22 @@ pub async fn get_new_table_definition_for_cdc_table(
"source schema should be None for CDC table"
);

if bind_table_constraints(constraints).unwrap().is_empty() {
st1page marked this conversation as resolved.
Show resolved Hide resolved
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names: Vec<_> = original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_string())
.collect();

constraints.push(TableConstraint::Unique {
name: None,
columns: pk_names.iter().map(Ident::new_unchecked).collect(),
is_primary: true,
});
}

let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
original_catalog
.columns()
Expand Down Expand Up @@ -163,9 +157,8 @@ fn to_ast_data_type(ty: &DataType) -> Result<AstDataType> {
pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
format_encode: Option<FormatEncodeOptions>,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Expand All @@ -175,8 +168,8 @@ pub async fn get_replace_table_plan(
TableJobType,
)> {
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(original_catalog);
let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
let Statement::CreateTable {
columns,
constraints,
Expand All @@ -186,16 +179,21 @@ pub async fn get_replace_table_plan(
with_version_column,
wildcard_idx,
cdc_table_info,
format_encode,
..
} = definition
} = new_definition
else {
panic!("unexpected statement type: {:?}", definition);
panic!("unexpected statement type: {:?}", new_definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
original_catalog,
old_catalog,
format_encode,
handler_args.clone(),
col_id_gen,
Expand All @@ -213,7 +211,7 @@ pub async fn get_replace_table_plan(

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
original_catalog
old_catalog
.columns()
.iter()
.map(|old_c| {
Expand All @@ -225,7 +223,7 @@ pub async fn get_replace_table_plan(
table.columns.len(),
);

let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect();
let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();

let target_columns = table
.columns
Expand All @@ -245,7 +243,7 @@ pub async fn get_replace_table_plan(
// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf();
table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();

Ok((source, table, graph, col_index_mapping, job_type))
}
Expand Down Expand Up @@ -332,6 +330,7 @@ pub async fn handle_alter_table_column(
else {
panic!("unexpected statement: {:?}", definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());
Expand Down Expand Up @@ -455,15 +454,14 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

replace_table_with_definition(
&session,
table_name,
definition,
&original_catalog,
format_encode,
)
.await?;
let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;

let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

Expand Down
64 changes: 34 additions & 30 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@

use anyhow::{anyhow, Context};
use fancy_regex::Regex;
use pgwire::pg_response::StatementType;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::{
fetch_table_catalog_for_alter, replace_table_with_definition, schema_has_schema_registry,
};
use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

Expand Down Expand Up @@ -66,6 +64,7 @@ pub async fn handle_refresh_schema(
format_encode.unwrap()
};

// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use.
let definition = alter_definition_format_encode(
&original_table.definition,
format_encode.row_options.clone(),
Expand All @@ -76,31 +75,36 @@ pub async fn handle_refresh_schema(
.try_into()
.unwrap();

let result = replace_table_with_definition(
&session,
table_name,
definition,
&original_table,
Some(format_encode),
)
.await;

match result {
Ok(_) => Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE)),
Err(e) => {
let report = e.to_report_string();
// This is a workaround for reporting errors when columns to drop is referenced by generated column.
// Finding the actual columns to drop requires generating `PbSource` from the sql definition
// and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
// Here we match the error message to yield when failing to bind generated column exprs.
let re = Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
let captures = re.captures(&report).map_err(anyhow::Error::from)?;
if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
gen_col_name.as_str())).into())
} else {
Err(e)
let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table, None).await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
}
Err(e) => {
let report = e.to_report_string();
// NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column.
// Finding the actual columns to drop requires generating `PbSource` from the sql definition
// and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
// Here we match the error message to yield when failing to bind generated column exprs.
let re =
Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
let captures = re.captures(&report).map_err(anyhow::Error::from)?;
if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
gen_col_name.as_str())).into())
} else {
Err(e)
}
}
}
}
}?;
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use crate::error::{Result, RwError};
use crate::expr::Expr;
use crate::handler::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns,
bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator,
bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -1523,8 +1523,7 @@ pub async fn bind_create_source_or_table_with_connector(
}
}

ensure_table_constraints_supported(&constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;

let columns_from_sql = bind_sql_columns(sql_columns_defs)?;

Expand Down
Loading
Loading