From 6f86b9231df43b921a0b3c8fba4516d950ff7a23 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 18 Jun 2024 00:20:49 +0800 Subject: [PATCH 01/18] alter cdc table --- proto/ddl_service.proto | 1 + src/frontend/src/catalog/catalog_service.rs | 6 +- .../src/handler/alter_table_column.rs | 25 +++- .../src/handler/alter_table_with_sr.rs | 1 + src/frontend/src/handler/create_sink.rs | 7 +- src/frontend/src/handler/create_table.rs | 136 +++++++++++++----- src/frontend/src/handler/drop_sink.rs | 3 +- src/frontend/src/stream_fragmenter/mod.rs | 2 +- src/frontend/src/test_utils.rs | 3 +- src/meta/service/src/ddl_service.rs | 10 +- src/meta/src/rpc/ddl_controller.rs | 48 +++++-- src/meta/src/stream/stream_graph/fragment.rs | 21 +++ src/rpc_client/src/meta_client.rs | 2 + 13 files changed, 200 insertions(+), 65 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 46c2a5c22ff6d..7c6a078ce9ce4 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -324,6 +324,7 @@ message ReplaceTablePlan { catalog.ColIndexMapping table_col_index_mapping = 3; // Source catalog of table's associated source catalog.Source source = 4; + TableJobType job_type = 5; } message ReplaceTablePlanRequest { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index f2bcdd2b62e12..39b3fb2501f41 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -26,7 +26,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, + PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -92,6 +92,7 @@ pub trait CatalogWriter: Send + Sync { table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, + job_type: TableJobType, ) -> Result<()>; async fn alter_source_column(&self, source: PbSource) -> Result<()>; @@ -316,10 +317,11 @@ impl CatalogWriter for CatalogWriterImpl { table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, + job_type: TableJobType, ) -> Result<()> { let version = self .meta_client - .replace_table(source, table, graph, mapping) + .replace_table(source, table, graph, mapping, job_type) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 0143a0e8367a3..0e0e7d49f1108 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -18,6 +18,7 @@ use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, @@ -25,7 +26,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; -use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; +use super::create_table::{bind_sql_columns, generate_stream_graph_for_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -39,11 +40,13 @@ pub async fn replace_table_with_definition( table_name: ObjectName, definition: Statement, original_catalog: &Arc, + columns_altered: Vec, source_schema: Option, ) -> Result<()> { // Create handler args as if we're creating a new table with the altered definition. let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); + // TODO: We need to handle cdc table info here let Statement::CreateTable { columns, constraints, @@ -52,13 +55,14 @@ pub async fn replace_table_with_definition( on_conflict, with_version_column, wildcard_idx, + cdc_table_info, .. } = definition else { panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source) = generate_stream_graph_for_table( + let (graph, table, source, job_type) = generate_stream_graph_for_table( session, table_name, original_catalog, @@ -66,12 +70,14 @@ pub async fn replace_table_with_definition( handler_args, col_id_gen, columns, + columns_altered, wildcard_idx, constraints, source_watermarks, append_only, on_conflict, with_version_column, + cdc_table_info, ) .await?; @@ -91,8 +97,9 @@ pub async fn replace_table_with_definition( let catalog_writer = session.catalog_writer()?; + // TODO: pass job_type to DdlService catalog_writer - .replace_table(source, table, graph, col_index_mapping) + .replace_table(source, table, graph, col_index_mapping, job_type) .await?; Ok(()) } @@ -145,6 +152,7 @@ pub async fn handle_alter_table_column( } } + let mut columns_altered = original_catalog.columns.clone(); match operation { AlterTableOperation::AddColumn { column_def: new_column, @@ -172,7 +180,8 @@ pub async fn handle_alter_table_column( } // Add the new column to the table definition. - columns.push(new_column); + columns.push(new_column.clone()); + columns_altered.extend(bind_sql_columns(vec![new_column].as_slice())?) } AlterTableOperation::DropColumn { @@ -194,6 +203,10 @@ pub async fn handle_alter_table_column( if removed_column.is_some() { // PASS + // retain untouch columns + columns_altered.retain(|col| { + !(col.name() == removed_column.as_ref().unwrap().name.real_value().as_str()) + }) } else if if_exists { return Ok(PgResponse::builder(StatementType::ALTER_TABLE) .notice(format!( @@ -210,13 +223,15 @@ pub async fn handle_alter_table_column( } _ => unreachable!(), - } + }; + // Siyuan: 上面是把新增的column添加到column list里 replace_table_with_definition( &session, table_name, definition, &original_catalog, + columns_altered, source_schema, ) .await?; diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index c1700c36a3c90..3e784d6255d59 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -86,6 +86,7 @@ pub async fn handle_refresh_schema( table_name, definition, &original_table, + vec![], Some(connector_schema), ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f6b6e5f563d2d..73f33308804dc 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -32,7 +32,7 @@ use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; use risingwave_pb::catalog::{PbSource, Table}; -use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; @@ -466,6 +466,7 @@ pub async fn handle_create_sink( table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: None, + job_type: TableJobType::General as _, }); } @@ -634,7 +635,7 @@ pub(crate) async fn reparse_table_for_sink( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source) = generate_stream_graph_for_table( + let (graph, table, source, _) = generate_stream_graph_for_table( session, table_name, table_catalog, @@ -642,12 +643,14 @@ pub(crate) async fn reparse_table_for_sink( handler_args, col_id_gen, columns, + vec![], wildcard_idx, constraints, source_watermarks, append_only, on_conflict, with_version_column, + None, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0f3693653ced5..350a2053fc5d7 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1226,50 +1226,114 @@ pub async fn generate_stream_graph_for_table( handler_args: HandlerArgs, col_id_gen: ColumnIdGenerator, columns: Vec, + column_catalogs: Vec, wildcard_idx: Option, constraints: Vec, source_watermarks: Vec, append_only: bool, on_conflict: Option, with_version_column: Option, -) -> Result<(StreamFragmentGraph, Table, Option)> { + cdc_table_info: Option, +) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let (plan, source, table) = match source_schema { - Some(source_schema) => { - gen_create_table_plan_with_source( - handler_args, - ExplainOptions::default(), - table_name, - columns, - wildcard_idx, - constraints, - source_schema, - source_watermarks, - col_id_gen, - append_only, - on_conflict, - with_version_column, - vec![], + let ((plan, source, table), job_type) = + match (source_schema, cdc_table_info.as_ref()) { + (Some(source_schema), None) => ( + gen_create_table_plan_with_source( + handler_args, + ExplainOptions::default(), + table_name, + columns, + wildcard_idx, + constraints, + source_schema, + source_watermarks, + col_id_gen, + append_only, + on_conflict, + with_version_column, + vec![], + ) + .await?, + TableJobType::General, + ), + (None, None) => { + let context = OptimizerContext::from_handler_args(handler_args); + let (plan, table) = gen_create_table_plan( + context, + table_name, + columns, + constraints, + col_id_gen, + source_watermarks, + append_only, + on_conflict, + with_version_column, + )?; + ((plan, None, table), TableJobType::General) + } + (None, Some(cdc_table)) => { + let session = &handler_args.session; + let db_name = session.database(); + let (schema_name, resolved_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name)?; + let (database_id, schema_id) = + session.get_database_and_schema_id_for_create(schema_name.clone())?; + + let (source_schema, source_name) = + Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; + + let source = { + let catalog_reader = session.env().catalog_reader().read_guard(); + let schema_name = source_schema + .clone() + .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (source, _) = catalog_reader.get_source_by_name( + db_name, + SchemaPath::Name(schema_name.as_str()), + source_name.as_str(), + )?; + source.clone() + }; + + let connect_properties = derive_connect_properties( + &source.with_properties, + cdc_table.external_table_name.clone(), + )?; + + let pk_names = original_catalog + .stream_key + .iter() + .map(|pk_index| original_catalog.columns[*pk_index].name().to_string()) + .collect(); + + let (plan, table) = gen_create_table_plan_for_cdc_table( + handler_args, + ExplainOptions::default(), + source, + cdc_table.external_table_name.clone(), + column_catalogs, + pk_names, + connect_properties, + col_id_gen, + on_conflict, + with_version_column, + vec![], + resolved_table_name, + database_id, + schema_id, + )?; + + ((plan, None, table), TableJobType::SharedCdcSource) + } + (Some(_), Some(_)) => return Err(ErrorCode::NotSupported( + "Data format and encoding format doesn't apply to table created from a CDC source" + .into(), + "Remove the FORMAT and ENCODE specification".into(), ) - .await? - } - None => { - let context = OptimizerContext::from_handler_args(handler_args); - let (plan, table) = gen_create_table_plan( - context, - table_name, - columns, - constraints, - col_id_gen, - source_watermarks, - append_only, - on_conflict, - with_version_column, - )?; - (plan, None, table) - } - }; + .into()), + }; // TODO: avoid this backward conversion. if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() { @@ -1297,7 +1361,7 @@ pub async fn generate_stream_graph_for_table( ..table }; - Ok((graph, table, source)) + Ok((graph, table, source, job_type)) } #[cfg(test)] diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index a42605ad1c856..7df3deb9209e9 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -13,7 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_sqlparser::ast::ObjectName; use super::RwPgResponse; @@ -89,6 +89,7 @@ pub async fn handle_drop_sink( table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: None, + job_type: TableJobType::General as _, }); } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index f2d768cc076f4..7596a5544b391 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -308,7 +308,7 @@ fn build_fragment( // memorize upstream source id for later use state .dependent_table_ids - .insert(TableId::new(node.upstream_source_id)); + .insert(node.upstream_source_id.into()); current_fragment .upstream_table_ids .push(node.upstream_source_id); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 97ab9f4eecdc9..08549aaaab284 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -42,7 +42,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType, - ReplaceTablePlan, + ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -308,6 +308,7 @@ impl CatalogWriter for MockCatalogWriter { table: PbTable, _graph: StreamFragmentGraph, _mapping: ColIndexMapping, + _job_type: TableJobType, ) -> Result<()> { self.catalog.write().update_table(&table); Ok(()) diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 2e4ba23e02d8f..2b2b1bcfeb2b3 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -82,6 +82,7 @@ impl DdlServiceImpl { } fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { + let job_type = change.get_job_type().unwrap_or_default(); let mut source = change.source; let mut fragment_graph = change.fragment_graph.unwrap(); let mut table = change.table.unwrap(); @@ -89,19 +90,14 @@ impl DdlServiceImpl { table.optional_associated_source_id { source.as_mut().unwrap().id = source_id; - fill_table_stream_graph_info( - &mut source, - &mut table, - TableJobType::General, - &mut fragment_graph, - ); + fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph); } let table_col_index_mapping = change .table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); - let stream_job = StreamingJob::Table(source, table, TableJobType::General); + let stream_job = StreamingJob::Table(source, table, job_type); ReplaceTableInfo { streaming_job: stream_job, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e1c6ebb0f32a8..3f5592d4afa5f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -21,7 +21,7 @@ use std::time::Duration; use aes_siv::aead::generic_array::GenericArray; use aes_siv::aead::Aead; use aes_siv::{Aes128SivAead, KeyInit}; -use anyhow::Context; +use anyhow::{anyhow, Context}; use itertools::Itertools; use rand::{Rng, RngCore}; use risingwave_common::config::DefaultParallelism; @@ -71,9 +71,9 @@ use tracing::Instrument; use crate::barrier::BarrierManagerRef; use crate::error::MetaErrorInner; use crate::manager::{ - CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, - IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1, - NotificationVersion, RelationIdEnum, SchemaId, SecretId, SinkId, SourceId, + CatalogManagerRef, ConnectionId, DatabaseId, DdlType, FragmentManagerRef, FunctionId, + IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, + MetadataManagerV1, NotificationVersion, RelationIdEnum, SchemaId, SecretId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, }; @@ -1993,6 +1993,14 @@ impl DdlController { .mview_fragment() .expect("mview fragment not found"); + let ddl_type = DdlType::from(stream_job); + let DdlType::Table(table_job_type) = &ddl_type else { + bail!( + "only support replacing table streaming job, ddl_type: {:?}", + ddl_type + ) + }; + // Map the column indices in the dispatchers with the given mapping. let downstream_fragments = self.metadata_manager.get_downstream_chain_fragments(id).await? .into_iter() @@ -2010,12 +2018,32 @@ impl DdlController { ) })?; - let complete_graph = CompleteStreamFragmentGraph::with_downstreams( - fragment_graph, - original_table_fragment.fragment_id, - downstream_fragments, - stream_job.into(), - )?; + let complete_graph = match table_job_type { + TableJobType::General => CompleteStreamFragmentGraph::with_downstreams( + fragment_graph, + original_table_fragment.fragment_id, + downstream_fragments, + ddl_type, + )?, + + TableJobType::SharedCdcSource => { + // get the upstream fragment + let upstream_root_fragments = self + .metadata_manager + .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) + .await?; + CompleteStreamFragmentGraph::with_upstreams_and_downstreams( + fragment_graph, + upstream_root_fragments, + original_table_fragment.fragment_id, + downstream_fragments, + ddl_type, + )? + } + TableJobType::Unspecified => { + unreachable!() + } + }; // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e347dd0287f36..47bf446f1555d 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -606,6 +606,27 @@ impl CompleteStreamFragmentGraph { ) } + /// For replacing an existing table based on shared cdc source + pub fn with_upstreams_and_downstreams( + graph: StreamFragmentGraph, + upstream_root_fragments: HashMap, + original_table_fragment_id: FragmentId, + downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + ddl_type: DdlType, + ) -> MetaResult { + Self::build_helper( + graph, + Some(FragmentGraphUpstreamContext { + upstream_root_fragments, + }), + Some(FragmentGraphDownstreamContext { + original_table_fragment_id, + downstream_fragments, + }), + ddl_type, + ) + } + /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments. fn build_helper( mut graph: StreamFragmentGraph, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 296f8de4d888f..3340e60bd6184 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -528,6 +528,7 @@ impl MetaClient { table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, + job_type: PbTableJobType, ) -> Result { let request = ReplaceTablePlanRequest { plan: Some(ReplaceTablePlan { @@ -535,6 +536,7 @@ impl MetaClient { table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), + job_type: job_type as _, }), }; let resp = self.inner.replace_table_plan(request).await?; From feec30e6f05bb6662fe25e9b6bef32530fcc1f5b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 20 Jun 2024 19:24:39 +0800 Subject: [PATCH 02/18] finish alter add --- src/frontend/src/handler/alter_table_column.rs | 8 +++++--- src/frontend/src/handler/create_table.rs | 10 ++++++++-- src/meta/src/manager/catalog/mod.rs | 2 -- src/meta/src/rpc/ddl_controller.rs | 4 ++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 0e0e7d49f1108..54ea89a73c8cd 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, @@ -179,8 +180,10 @@ pub async fn handle_alter_table_column( ))? } - // Add the new column to the table definition. - columns.push(new_column.clone()); + // Add the new column to the table definition if it is not created by `create table (*)` syntax. + if !columns.is_empty() { + columns.push(new_column.clone()); + } columns_altered.extend(bind_sql_columns(vec![new_column].as_slice())?) } @@ -225,7 +228,6 @@ pub async fn handle_alter_table_column( _ => unreachable!(), }; - // Siyuan: 上面是把新增的column添加到column list里 replace_table_with_definition( &session, table_name, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 350a2053fc5d7..fd2984d44438b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -730,6 +730,9 @@ fn gen_table_plan_inner( Ok((materialize.into(), table)) } +/// Generate stream plan for cdc table based on shared source. +/// In replace workflow, the `table_id` is the id of the table to be replaced +/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta #[allow(clippy::too_many_arguments)] pub(crate) fn gen_create_table_plan_for_cdc_table( handler_args: HandlerArgs, @@ -746,6 +749,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( resolved_table_name: String, database_id: DatabaseId, schema_id: SchemaId, + table_id: TableId, ) -> Result<(PlanRef, PbTable)> { let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); let session = context.session_ctx().clone(); @@ -783,8 +787,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( .collect(); let cdc_table_desc = CdcTableDesc { - table_id: TableId::placeholder(), // will be filled in meta node - source_id: source.id.into(), // id of cdc source streaming job + table_id, + source_id: source.id.into(), // id of cdc source streaming job external_table_name: external_table_name.clone(), pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), @@ -999,6 +1003,7 @@ pub(super) async fn handle_create_table_plan( resolved_table_name, database_id, schema_id, + TableId::placeholder(), )?; ((plan, None, table), TableJobType::SharedCdcSource) @@ -1323,6 +1328,7 @@ pub async fn generate_stream_graph_for_table( resolved_table_name, database_id, schema_id, + original_catalog.id(), )?; ((plan, None, table), TableJobType::SharedCdcSource) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3aeab64bef128..1c58fe22b7b90 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3338,8 +3338,6 @@ impl CatalogManager { database_core.ensure_database_id(table.database_id)?; database_core.ensure_schema_id(table.schema_id)?; - assert!(table.dependent_relations.is_empty()); - let key = (table.database_id, table.schema_id, table.name.clone()); let original_table = database_core .get_table(table.id) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3f5592d4afa5f..fbeca8fa214cc 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -2018,6 +2018,7 @@ impl DdlController { ) })?; + // build complete graph based on the table job type let complete_graph = match table_job_type { TableJobType::General => CompleteStreamFragmentGraph::with_downstreams( fragment_graph, @@ -2027,7 +2028,7 @@ impl DdlController { )?, TableJobType::SharedCdcSource => { - // get the upstream fragment + // get the upstream fragment which should be the cdc source let upstream_root_fragments = self .metadata_manager .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) @@ -2063,7 +2064,6 @@ impl DdlController { } = actor_graph_builder .generate_graph(&self.env, stream_job, expr_context) .await?; - assert!(dispatchers.is_empty()); // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute From 6ba5f0cd1cd0664d402d2cfe4be7dfabc6366148 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 20 Jun 2024 19:57:21 +0800 Subject: [PATCH 03/18] finish alter drop --- src/frontend/src/handler/alter_table_column.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 54ea89a73c8cd..d20808c76e729 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -161,9 +161,9 @@ pub async fn handle_alter_table_column( // Duplicated names can actually be checked by `StreamMaterialize`. We do here for // better error reporting. let new_column_name = new_column.name.real_value(); - if columns + if columns_altered .iter() - .any(|c| c.name.real_value() == new_column_name) + .any(|c| c.name() == new_column_name.as_str()) { Err(ErrorCode::InvalidInputSyntax(format!( "column \"{new_column_name}\" of table \"{table_name}\" already exists" @@ -198,18 +198,20 @@ pub async fn handle_alter_table_column( // Locate the column by name and remove it. let column_name = column_name.real_value(); - let removed_column = columns - .extract_if(|c| c.name.real_value() == column_name) + let removed_column = columns_altered + .extract_if(|c| c.name() == column_name.as_str()) .at_most_one() .ok() .unwrap(); if removed_column.is_some() { // PASS - // retain untouch columns - columns_altered.retain(|col| { - !(col.name() == removed_column.as_ref().unwrap().name.real_value().as_str()) - }) + // remove from table definition + columns + .extract_if(|c| c.name.real_value() == column_name) + .at_most_one() + .ok() + .unwrap(); } else if if_exists { return Ok(PgResponse::builder(StatementType::ALTER_TABLE) .notice(format!( From 9faad96ecec728cd17845c2a1912bf9770e1e5e5 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 20 Jun 2024 20:13:31 +0800 Subject: [PATCH 04/18] minor --- e2e_test/source/cdc_inline/alter/postgres_alter.slt | 4 ++-- src/frontend/src/handler/alter_table_column.rs | 2 -- src/frontend/src/handler/create_table.rs | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/e2e_test/source/cdc_inline/alter/postgres_alter.slt b/e2e_test/source/cdc_inline/alter/postgres_alter.slt index 807b6a152af54..883dc669b03ca 100644 --- a/e2e_test/source/cdc_inline/alter/postgres_alter.slt +++ b/e2e_test/source/cdc_inline/alter/postgres_alter.slt @@ -37,7 +37,7 @@ INSERT INTO alter_test VALUES (3, 'c'); " # FIXME: after schema change in RisingWave, why does it take so long to get the new data? -sleep 20s +sleep 5s query ITT SELECT * FROM alter_test ORDER BY k @@ -72,7 +72,7 @@ INSERT INTO alter_test VALUES (5, 'ee'); " # FIXME: after schema change in RisingWave, why does it take so long to get the new data? -sleep 20s +sleep 5s query IT SELECT * FROM alter_test ORDER BY k diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index d20808c76e729..5974614140cc0 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -47,7 +47,6 @@ pub async fn replace_table_with_definition( // Create handler args as if we're creating a new table with the altered definition. let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); - // TODO: We need to handle cdc table info here let Statement::CreateTable { columns, constraints, @@ -98,7 +97,6 @@ pub async fn replace_table_with_definition( let catalog_writer = session.catalog_writer()?; - // TODO: pass job_type to DdlService catalog_writer .replace_table(source, table, graph, col_index_mapping, job_type) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index fd2984d44438b..116ef7c0067e5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1324,7 +1324,7 @@ pub async fn generate_stream_graph_for_table( col_id_gen, on_conflict, with_version_column, - vec![], + vec![], // empty include options resolved_table_name, database_id, schema_id, From ff59a0d8b2cf40a84b15a8c1dce14190cc468e12 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 12:26:41 +0800 Subject: [PATCH 05/18] add e2e test --- .../cdc/alter/cdc_table_alter.slt | 214 ++++++++++++++++++ src/meta/src/manager/catalog/mod.rs | 2 - 2 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 e2e_test/source_inline/cdc/alter/cdc_table_alter.slt diff --git a/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt b/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt new file mode 100644 index 0000000000000..644ae5f96d58d --- /dev/null +++ b/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt @@ -0,0 +1,214 @@ +control substitution on + +system ok +mysql -u root --protocol='tcp' -e " + SET GLOBAL time_zone = '+00:00'; +" + +system ok +mysql -u root --protocol='tcp' -e " + DROP DATABASE IF EXISTS testdb1; + CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512) + ); + ALTER TABLE products AUTO_INCREMENT = 101; + INSERT INTO products + VALUES (default,'scooter','Small 2-wheel scooter'), + (default,'car battery','12V car battery'), + (default,'12-pack drill','12-pack of drill bits with sizes ranging from #40 to #3'), + (default,'hammer','12oz carpenter s hammer'), + (default,'hammer','14oz carpenter s hammer'), + (default,'hammer','16oz carpenter s hammer'), + (default,'rocks','box of assorted rocks'), + (default,'jacket','water resistent black wind breaker'), + (default,'spare tire','24 inch spare tire'); + CREATE TABLE orders ( + order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_date DATETIME NOT NULL, + customer_name VARCHAR(255) NOT NULL, + price DECIMAL(10, 5) NOT NULL, + product_id INTEGER NOT NULL, + order_status BOOLEAN NOT NULL + ) AUTO_INCREMENT = 10001; + INSERT INTO orders + VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), + (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), + (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false); +" + +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'dbz', + password = '123456', + database.name = 'testdb1', + server.id = '5185' +); + +statement ok +create table products_1 ( id INT, + name STRING, + description STRING, + PRIMARY KEY (id) +) from mysql_source table 'testdb1.products'; + +statement ok +create table orders_1 (*) from mysql_source table 'testdb1.orders'; + +system ok +psql -c " + DROP TABLE IF EXISTS shipments; + CREATE TABLE shipments ( + shipment_id SERIAL NOT NULL PRIMARY KEY, + order_id SERIAL NOT NULL, + origin VARCHAR(255) NOT NULL, + destination VARCHAR(255) NOT NULL, + is_arrived BOOLEAN NOT NULL + ); + ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; + INSERT INTO shipments + VALUES (default,10001,'Beijing','Shanghai',false), + (default,10002,'Hangzhou','Shanghai',false), + (default,10003,'Shanghai','Hangzhou',false); +" + +statement ok +create source pg_source with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'cdc_alter_test' +); + +statement ok +create table shipments_1 ( + shipment_id INTEGER, + order_id INTEGER, + origin STRING, + destination STRING, + is_arrived boolean, + PRIMARY KEY (shipment_id) +) from pg_source table 'public.shipments'; + +# Create a mview join orders, products and shipments +statement ok +create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived + FROM orders_1 AS o + LEFT JOIN products_1 AS p ON o.product_id = p.id + LEFT JOIN shipments_1 AS s ON o.order_id = s.order_id; + + +sleep 3s + +query III +select order_id, product_id, shipment_id from enriched_orders order by order_id; +---- +10001 102 1001 +10002 105 1002 +10003 106 1003 + + +# alter mysql tables +system ok +mysql -u root --protocol='tcp' -e " + USE testdb1; + ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0; + ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255); +" + +# alter cdc tables +statement ok +ALTER TABLE products_1 ADD COLUMN weight DECIMAL; + +statement ok +ALTER TABLE orders_1 ADD COLUMN order_comment VARCHAR; + +# wait alter ddl +sleep 3s + +query ITTT +SELECT id,name,description,weight FROM products_1 order by id limit 3 +---- +101 scooter Small 2-wheel scooter NULL +102 car battery 12V car battery NULL +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL + + +# update mysql tables +system ok +mysql -u root --protocol='tcp' -e " + USE testdb1; + UPDATE products SET weight = 10.5 WHERE id = 101; + UPDATE products SET weight = 12.5 WHERE id = 102; + UPDATE orders SET order_comment = 'very good' WHERE order_id = 10001; +" + +sleep 3s + +query ITTT +SELECT id,name,description,weight FROM products_1 order by id limit 3 +---- +101 scooter Small 2-wheel scooter 10.50 +102 car battery 12V car battery 12.50 +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL + +query ITTT +SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM orders_1 order by order_id limit 2 +---- +10001 2020-07-30 10:08:22 Jark 102 0 very good +10002 2020-07-30 10:11:09 Sally 105 0 NULL + + +# alter mysql tables +system ok +mysql -u root --protocol='tcp' -e " + USE testdb1; + ALTER TABLE products DROP COLUMN weight; +" + +# alter cdc table to drop column +statement ok +ALTER TABLE products_1 DROP COLUMN weight; + +# wait alter ddl +sleep 3s + +query TTTT +describe products_1; +---- +id integer false NULL +name character varying false NULL +description character varying false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description products_1 NULL NULL + + +# alter pg table +system ok +psql -c " + ALTER TABLE shipments DROP COLUMN destination; +" + +statement error +ALTER TABLE shipments_1 DROP COLUMN destination; + +# wait alter ddl +sleep 3s + +# query mv again +query III +select order_id, product_id, shipment_id from enriched_orders order by order_id; +---- +10001 102 1001 +10002 105 1002 +10003 106 1003 diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 1c58fe22b7b90..b12611b6918ef 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3491,8 +3491,6 @@ impl CatalogManager { let database_core = &mut core.database; let key = (table.database_id, table.schema_id, table.name.clone()); - assert!(table.dependent_relations.is_empty()); - assert!( database_core.tables.contains_key(&table.id) && database_core.has_in_progress_creation(&key), From bb5b9adcdd3ec26075c2638b242b06d71a232155 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 12:35:07 +0800 Subject: [PATCH 06/18] format --- src/frontend/src/handler/alter_table_column.rs | 1 - src/meta/src/rpc/ddl_controller.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 5974614140cc0..50d72a0407ee1 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -19,7 +19,6 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnCatalog; -use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index fbeca8fa214cc..03214e9da1370 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -21,7 +21,7 @@ use std::time::Duration; use aes_siv::aead::generic_array::GenericArray; use aes_siv::aead::Aead; use aes_siv::{Aes128SivAead, KeyInit}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use itertools::Itertools; use rand::{Rng, RngCore}; use risingwave_common::config::DefaultParallelism; From 7e8e936428c3972f33e4cb65f8a2769e55cc592b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 14:32:35 +0800 Subject: [PATCH 07/18] minor --- e2e_test/source_inline/cdc/alter/cdc_table_alter.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt b/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt index 644ae5f96d58d..931756cf8860b 100644 --- a/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt +++ b/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt @@ -45,7 +45,7 @@ create source mysql_source with ( connector = 'mysql-cdc', hostname = '${MYSQL_HOST:localhost}', port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', + username = 'root', password = '123456', database.name = 'testdb1', server.id = '5185' From 44bebe41d3886585310235d7f248ed75331881ef Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 15:01:47 +0800 Subject: [PATCH 08/18] minor --- ci/scripts/e2e-source-test.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 84fff651b547c..adb4c01187c6b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -36,6 +36,9 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu echo "--- e2e, inline test" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test +export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 +createdb risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' echo "--- Kill cluster" @@ -54,8 +57,6 @@ echo "--- e2e, ci-1cn-1fe, mysql & postgres cdc" mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc.sql # import data to postgres -export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test -createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql echo "--- starting risingwave cluster" From 4b55d0749b9ca2574c0f307b541d6b60bb4eebc6 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 15:50:37 +0800 Subject: [PATCH 09/18] fix --- ci/scripts/e2e-source-test.sh | 5 ++--- .../cdc_inline}/alter/cdc_table_alter.slt | 10 ++++++++++ e2e_test/source/cdc_inline/alter/postgres_alter.slt | 4 ++-- 3 files changed, 14 insertions(+), 5 deletions(-) rename e2e_test/{source_inline/cdc => source/cdc_inline}/alter/cdc_table_alter.slt (97%) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index adb4c01187c6b..84fff651b547c 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -36,9 +36,6 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu echo "--- e2e, inline test" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test -export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 -createdb risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' echo "--- Kill cluster" @@ -57,6 +54,8 @@ echo "--- e2e, ci-1cn-1fe, mysql & postgres cdc" mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc.sql # import data to postgres +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test +createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql echo "--- starting risingwave cluster" diff --git a/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt similarity index 97% rename from e2e_test/source_inline/cdc/alter/cdc_table_alter.slt rename to e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index 931756cf8860b..d2a0abaeb4b70 100644 --- a/e2e_test/source_inline/cdc/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -212,3 +212,13 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id; 10001 102 1001 10002 105 1002 10003 106 1003 + +statement ok +drop materialized view enriched_orders; + +statement ok +drop source mysql_source cascade; + +statement ok +drop source pg_source cascade; + diff --git a/e2e_test/source/cdc_inline/alter/postgres_alter.slt b/e2e_test/source/cdc_inline/alter/postgres_alter.slt index 883dc669b03ca..807b6a152af54 100644 --- a/e2e_test/source/cdc_inline/alter/postgres_alter.slt +++ b/e2e_test/source/cdc_inline/alter/postgres_alter.slt @@ -37,7 +37,7 @@ INSERT INTO alter_test VALUES (3, 'c'); " # FIXME: after schema change in RisingWave, why does it take so long to get the new data? -sleep 5s +sleep 20s query ITT SELECT * FROM alter_test ORDER BY k @@ -72,7 +72,7 @@ INSERT INTO alter_test VALUES (5, 'ee'); " # FIXME: after schema change in RisingWave, why does it take so long to get the new data? -sleep 5s +sleep 20s query IT SELECT * FROM alter_test ORDER BY k From ce31e20c4830373f5f02bc7d53b57113cdf181b8 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 19:40:31 +0800 Subject: [PATCH 10/18] fix --- e2e_test/source/cdc/postgres_cdc.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 6579bc2683037..43a120a19d50f 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -1,4 +1,5 @@ -- PG +DROP TABLE IF EXISTS shipments; CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, From c7dab61ec5c40190c109bc114d0dbe76fca50015 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 25 Jun 2024 20:13:42 +0800 Subject: [PATCH 11/18] fix final --- .../source/cdc_inline/alter/cdc_table_alter.slt | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index d2a0abaeb4b70..ff9cb85cbc361 100644 --- a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -63,16 +63,16 @@ create table orders_1 (*) from mysql_source table 'testdb1.orders'; system ok psql -c " - DROP TABLE IF EXISTS shipments; - CREATE TABLE shipments ( + DROP TABLE IF EXISTS shipments1; + CREATE TABLE shipments1 ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); - ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; - INSERT INTO shipments + ALTER SEQUENCE public.shipments1_shipment_id_seq RESTART WITH 1001; + INSERT INTO shipments1 VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false); @@ -97,7 +97,7 @@ create table shipments_1 ( destination STRING, is_arrived boolean, PRIMARY KEY (shipment_id) -) from pg_source table 'public.shipments'; +) from pg_source table 'public.shipments1'; # Create a mview join orders, products and shipments statement ok @@ -196,7 +196,7 @@ table description products_1 NULL NULL # alter pg table system ok psql -c " - ALTER TABLE shipments DROP COLUMN destination; + ALTER TABLE shipments1 DROP COLUMN destination; " statement error @@ -221,4 +221,3 @@ drop source mysql_source cascade; statement ok drop source pg_source cascade; - From 2c93b06c03f23e6d08e684f04256b39d162b9e78 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 3 Jul 2024 22:14:30 +0800 Subject: [PATCH 12/18] fix comments --- .../cdc_inline/alter/cdc_table_alter.slt | 37 ++++++++++++++----- .../src/handler/alter_table_column.rs | 31 +++++++--------- src/frontend/src/handler/create_sink.rs | 1 - src/frontend/src/handler/create_table.rs | 21 ++++++----- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index ff9cb85cbc361..eeeeb86610bee 100644 --- a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -1,12 +1,14 @@ control substitution on +# mysql env vars will be read from the `.risingwave/config/risedev-env` file + system ok -mysql -u root --protocol='tcp' -e " +mysql -e " SET GLOBAL time_zone = '+00:00'; " system ok -mysql -u root --protocol='tcp' -e " +mysql -e " DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; USE testdb1; @@ -43,10 +45,10 @@ mysql -u root --protocol='tcp' -e " statement ok create source mysql_source with ( connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', + hostname = '${MYSQL_HOST}', + port = '${MYSQL_TCP_PORT}', username = 'root', - password = '123456', + password = '${MYSQL_PWD}', database.name = 'testdb1', server.id = '5185' ); @@ -59,7 +61,15 @@ create table products_1 ( id INT, ) from mysql_source table 'testdb1.products'; statement ok -create table orders_1 (*) from mysql_source table 'testdb1.orders'; +create table orders_1 ( + order_id int, + order_date timestamp, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) from mysql_source table 'testdb1.orders'; system ok psql -c " @@ -145,7 +155,7 @@ SELECT id,name,description,weight FROM products_1 order by id limit 3 # update mysql tables system ok -mysql -u root --protocol='tcp' -e " +mysql -e " USE testdb1; UPDATE products SET weight = 10.5 WHERE id = 101; UPDATE products SET weight = 12.5 WHERE id = 102; @@ -170,7 +180,7 @@ SELECT order_id,order_date,customer_name,product_id,order_status,order_comment F # alter mysql tables system ok -mysql -u root --protocol='tcp' -e " +mysql -e " USE testdb1; ALTER TABLE products DROP COLUMN weight; " @@ -199,7 +209,7 @@ psql -c " ALTER TABLE shipments1 DROP COLUMN destination; " -statement error +statement error unable to drop the column due to being referenced by downstream materialized views or sinks ALTER TABLE shipments_1 DROP COLUMN destination; # wait alter ddl @@ -216,6 +226,15 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id; statement ok drop materialized view enriched_orders; +statement ok +drop table orders_1; + +statement ok +create table orders_2 (*) from mysql_source table 'testdb1.orders'; + +statement error Not supported: alter a table with empty column definitions +ALTER TABLE orders_2 ADD COLUMN order_comment VARCHAR; + statement ok drop source mysql_source cascade; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 50d72a0407ee1..a24de72959ea1 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, @@ -40,7 +41,6 @@ pub async fn replace_table_with_definition( table_name: ObjectName, definition: Statement, original_catalog: &Arc, - columns_altered: Vec, source_schema: Option, ) -> Result<()> { // Create handler args as if we're creating a new table with the altered definition. @@ -69,7 +69,6 @@ pub async fn replace_table_with_definition( handler_args, col_id_gen, columns, - columns_altered, wildcard_idx, constraints, source_watermarks, @@ -150,7 +149,13 @@ pub async fn handle_alter_table_column( } } - let mut columns_altered = original_catalog.columns.clone(); + if columns.is_empty() { + Err(ErrorCode::NotSupported( + "alter a table with empty column definitions".to_string(), + "Please recreate the table with column definitions.".to_string(), + ))? + } + match operation { AlterTableOperation::AddColumn { column_def: new_column, @@ -158,9 +163,9 @@ pub async fn handle_alter_table_column( // Duplicated names can actually be checked by `StreamMaterialize`. We do here for // better error reporting. let new_column_name = new_column.name.real_value(); - if columns_altered + if columns .iter() - .any(|c| c.name() == new_column_name.as_str()) + .any(|c| c.name.real_value() == new_column_name) { Err(ErrorCode::InvalidInputSyntax(format!( "column \"{new_column_name}\" of table \"{table_name}\" already exists" @@ -178,10 +183,7 @@ pub async fn handle_alter_table_column( } // Add the new column to the table definition if it is not created by `create table (*)` syntax. - if !columns.is_empty() { - columns.push(new_column.clone()); - } - columns_altered.extend(bind_sql_columns(vec![new_column].as_slice())?) + columns.push(new_column); } AlterTableOperation::DropColumn { @@ -195,20 +197,14 @@ pub async fn handle_alter_table_column( // Locate the column by name and remove it. let column_name = column_name.real_value(); - let removed_column = columns_altered - .extract_if(|c| c.name() == column_name.as_str()) + let removed_column = columns + .extract_if(|c| c.name.real_value() == column_name) .at_most_one() .ok() .unwrap(); if removed_column.is_some() { // PASS - // remove from table definition - columns - .extract_if(|c| c.name.real_value() == column_name) - .at_most_one() - .ok() - .unwrap(); } else if if_exists { return Ok(PgResponse::builder(StatementType::ALTER_TABLE) .notice(format!( @@ -232,7 +228,6 @@ pub async fn handle_alter_table_column( table_name, definition, &original_catalog, - columns_altered, source_schema, ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 73f33308804dc..a9fc1c96b8903 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -643,7 +643,6 @@ pub(crate) async fn reparse_table_for_sink( handler_args, col_id_gen, columns, - vec![], wildcard_idx, constraints, source_watermarks, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 116ef7c0067e5..ff714be660400 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1230,8 +1230,7 @@ pub async fn generate_stream_graph_for_table( source_schema: Option, handler_args: HandlerArgs, col_id_gen: ColumnIdGenerator, - columns: Vec, - column_catalogs: Vec, + column_defs: Vec, wildcard_idx: Option, constraints: Vec, source_watermarks: Vec, @@ -1249,7 +1248,7 @@ pub async fn generate_stream_graph_for_table( handler_args, ExplainOptions::default(), table_name, - columns, + column_defs, wildcard_idx, constraints, source_schema, @@ -1268,7 +1267,7 @@ pub async fn generate_stream_graph_for_table( let (plan, table) = gen_create_table_plan( context, table_name, - columns, + column_defs, constraints, col_id_gen, source_watermarks, @@ -1307,18 +1306,20 @@ pub async fn generate_stream_graph_for_table( cdc_table.external_table_name.clone(), )?; - let pk_names = original_catalog - .stream_key - .iter() - .map(|pk_index| original_catalog.columns[*pk_index].name().to_string()) - .collect(); + let (columns, pk_names) = derive_schema_for_cdc_table( + &column_defs, + &constraints, + connect_properties.clone(), + false, + ) + .await?; let (plan, table) = gen_create_table_plan_for_cdc_table( handler_args, ExplainOptions::default(), source, cdc_table.external_table_name.clone(), - column_catalogs, + columns, pk_names, connect_properties, col_id_gen, From fb08929cdd374ef1b10e924eb244f897b74f483d Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 3 Jul 2024 22:36:29 +0800 Subject: [PATCH 13/18] minor --- src/frontend/src/handler/alter_table_column.rs | 4 +--- src/frontend/src/handler/alter_table_with_sr.rs | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index a24de72959ea1..5e33c5f0d1497 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -18,8 +18,6 @@ use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::ColumnCatalog; -use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, @@ -27,7 +25,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; -use super::create_table::{bind_sql_columns, generate_stream_graph_for_table, ColumnIdGenerator}; +use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index 3e784d6255d59..c1700c36a3c90 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -86,7 +86,6 @@ pub async fn handle_refresh_schema( table_name, definition, &original_table, - vec![], Some(connector_schema), ) .await?; From 56ad830c12fd7874e11b7fb3bdd02edd1470e611 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 4 Jul 2024 08:28:33 +0800 Subject: [PATCH 14/18] minor --- .../cdc_inline/alter/cdc_table_alter.slt | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index eeeeb86610bee..ed1110dc9ea01 100644 --- a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -54,14 +54,14 @@ create source mysql_source with ( ); statement ok -create table products_1 ( id INT, +create table my_products ( id INT, name STRING, description STRING, PRIMARY KEY (id) ) from mysql_source table 'testdb1.products'; statement ok -create table orders_1 ( +create table my_orders ( order_id int, order_date timestamp, customer_name string, @@ -100,7 +100,7 @@ create source pg_source with ( ); statement ok -create table shipments_1 ( +create table pg_shipments ( shipment_id INTEGER, order_id INTEGER, origin STRING, @@ -112,9 +112,9 @@ create table shipments_1 ( # Create a mview join orders, products and shipments statement ok create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived - FROM orders_1 AS o - LEFT JOIN products_1 AS p ON o.product_id = p.id - LEFT JOIN shipments_1 AS s ON o.order_id = s.order_id; + FROM my_orders AS o + LEFT JOIN my_products AS p ON o.product_id = p.id + LEFT JOIN pg_shipments AS s ON o.order_id = s.order_id; sleep 3s @@ -137,16 +137,16 @@ mysql -u root --protocol='tcp' -e " # alter cdc tables statement ok -ALTER TABLE products_1 ADD COLUMN weight DECIMAL; +ALTER TABLE my_products ADD COLUMN weight DECIMAL; statement ok -ALTER TABLE orders_1 ADD COLUMN order_comment VARCHAR; +ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR; # wait alter ddl sleep 3s query ITTT -SELECT id,name,description,weight FROM products_1 order by id limit 3 +SELECT id,name,description,weight FROM my_products order by id limit 3 ---- 101 scooter Small 2-wheel scooter NULL 102 car battery 12V car battery NULL @@ -165,14 +165,14 @@ mysql -e " sleep 3s query ITTT -SELECT id,name,description,weight FROM products_1 order by id limit 3 +SELECT id,name,description,weight FROM my_products order by id limit 3 ---- 101 scooter Small 2-wheel scooter 10.50 102 car battery 12V car battery 12.50 103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL query ITTT -SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM orders_1 order by order_id limit 2 +SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2 ---- 10001 2020-07-30 10:08:22 Jark 102 0 very good 10002 2020-07-30 10:11:09 Sally 105 0 NULL @@ -187,20 +187,20 @@ mysql -e " # alter cdc table to drop column statement ok -ALTER TABLE products_1 DROP COLUMN weight; +ALTER TABLE my_products DROP COLUMN weight; # wait alter ddl sleep 3s query TTTT -describe products_1; +describe my_products; ---- id integer false NULL name character varying false NULL description character varying false NULL primary key id NULL NULL distribution key id NULL NULL -table description products_1 NULL NULL +table description my_products NULL NULL # alter pg table @@ -210,7 +210,7 @@ psql -c " " statement error unable to drop the column due to being referenced by downstream materialized views or sinks -ALTER TABLE shipments_1 DROP COLUMN destination; +ALTER TABLE pg_shipments DROP COLUMN destination; # wait alter ddl sleep 3s @@ -227,13 +227,13 @@ statement ok drop materialized view enriched_orders; statement ok -drop table orders_1; +drop table my_orders; statement ok -create table orders_2 (*) from mysql_source table 'testdb1.orders'; +create table orders_test (*) from mysql_source table 'testdb1.orders'; statement error Not supported: alter a table with empty column definitions -ALTER TABLE orders_2 ADD COLUMN order_comment VARCHAR; +ALTER TABLE orders_test ADD COLUMN order_comment VARCHAR; statement ok drop source mysql_source cascade; From e2d392ca8b0d234b9a8fdeb323c89801e4db384c Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 5 Jul 2024 00:03:45 +0800 Subject: [PATCH 15/18] refactor --- src/frontend/src/handler/create_table.rs | 202 ++++++++++++----------- src/meta/src/manager/catalog/mod.rs | 9 +- 2 files changed, 113 insertions(+), 98 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 35a83b5a3cbbd..2292a3c5f7938 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1234,106 +1234,88 @@ pub async fn generate_stream_graph_for_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let ((plan, source, table), job_type) = - match (source_schema, cdc_table_info.as_ref()) { - (Some(source_schema), None) => ( - gen_create_table_plan_with_source( - handler_args, - ExplainOptions::default(), - table_name, - column_defs, - wildcard_idx, - constraints, - source_schema, - source_watermarks, - col_id_gen, - append_only, - on_conflict, - with_version_column, - vec![], - ) - .await?, - TableJobType::General, - ), - (None, None) => { - let context = OptimizerContext::from_handler_args(handler_args); - let (plan, table) = gen_create_table_plan( - context, - table_name, - column_defs, - constraints, - col_id_gen, - source_watermarks, - append_only, - on_conflict, - with_version_column, - )?; - ((plan, None, table), TableJobType::General) - } - (None, Some(cdc_table)) => { - let session = &handler_args.session; - let db_name = session.database(); - let (schema_name, resolved_table_name) = - Binder::resolve_schema_qualified_name(db_name, table_name)?; - let (database_id, schema_id) = - session.get_database_and_schema_id_for_create(schema_name.clone())?; - - let (source_schema, source_name) = - Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; - - let source = { - let catalog_reader = session.env().catalog_reader().read_guard(); - let schema_name = source_schema - .clone() - .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (source, _) = catalog_reader.get_source_by_name( - db_name, - SchemaPath::Name(schema_name.as_str()), - source_name.as_str(), - )?; - source.clone() - }; - - let connect_properties = derive_connect_properties( - &source.with_properties, - cdc_table.external_table_name.clone(), - )?; - - let (columns, pk_names) = derive_schema_for_cdc_table( - &column_defs, - &constraints, - connect_properties.clone(), - false, - ) - .await?; - - let (plan, table) = gen_create_table_plan_for_cdc_table( - handler_args, - ExplainOptions::default(), - source, - cdc_table.external_table_name.clone(), - columns, - pk_names, - connect_properties, - col_id_gen, - on_conflict, - with_version_column, - vec![], // empty include options - resolved_table_name, - database_id, - schema_id, - original_catalog.id(), - )?; - - ((plan, None, table), TableJobType::SharedCdcSource) - } - (Some(_), Some(_)) => return Err(ErrorCode::NotSupported( + let ((plan, source, table), job_type) = match (source_schema, cdc_table_info.as_ref()) { + (Some(source_schema), None) => ( + gen_create_table_plan_with_source( + handler_args, + ExplainOptions::default(), + table_name, + column_defs, + wildcard_idx, + constraints, + source_schema, + source_watermarks, + col_id_gen, + append_only, + on_conflict, + with_version_column, + vec![], + ) + .await?, + TableJobType::General, + ), + (None, None) => { + let context = OptimizerContext::from_handler_args(handler_args); + let (plan, table) = gen_create_table_plan( + context, + table_name, + column_defs, + constraints, + col_id_gen, + source_watermarks, + append_only, + on_conflict, + with_version_column, + )?; + ((plan, None, table), TableJobType::General) + } + (None, Some(cdc_table)) => { + let session = &handler_args.session; + let (source, resolved_table_name, database_id, schema_id) = + get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?; + + let connect_properties = derive_connect_properties( + &source.with_properties, + cdc_table.external_table_name.clone(), + )?; + + let (columns, pk_names) = derive_schema_for_cdc_table( + &column_defs, + &constraints, + connect_properties.clone(), + false, + ) + .await?; + + let (plan, table) = gen_create_table_plan_for_cdc_table( + handler_args, + ExplainOptions::default(), + source, + cdc_table.external_table_name.clone(), + columns, + pk_names, + connect_properties, + col_id_gen, + on_conflict, + with_version_column, + vec![], // empty include options + resolved_table_name, + database_id, + schema_id, + original_catalog.id(), + )?; + + ((plan, None, table), TableJobType::SharedCdcSource) + } + (Some(_), Some(_)) => { + return Err(ErrorCode::NotSupported( "Data format and encoding format doesn't apply to table created from a CDC source" .into(), "Remove the FORMAT and ENCODE specification".into(), ) - .into()), - }; + .into()) + } + }; // TODO: avoid this backward conversion. if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() { @@ -1364,6 +1346,34 @@ pub async fn generate_stream_graph_for_table( Ok((graph, table, source, job_type)) } +fn get_source_and_resolved_table_name( + session: &Arc, + cdc_table: CdcTableInfo, + table_name: ObjectName, +) -> Result<(Arc, String, DatabaseId, SchemaId)> { + let db_name = session.database(); + let (schema_name, resolved_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name)?; + let (database_id, schema_id) = + session.get_database_and_schema_id_for_create(schema_name.clone())?; + + let (source_schema, source_name) = + Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; + + let source = { + let catalog_reader = session.env().catalog_reader().read_guard(); + let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (source, _) = catalog_reader.get_source_by_name( + db_name, + SchemaPath::Name(schema_name.as_str()), + source_name.as_str(), + )?; + source.clone() + }; + + Ok((source, resolved_table_name, database_id, schema_id)) +} + #[cfg(test)] mod tests { use risingwave_common::catalog::{Field, DEFAULT_DATABASE_NAME, ROWID_PREFIX}; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 8414dff8c1bd0..0bb9967c89c6f 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -38,7 +38,7 @@ use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, Schema, Secret, Sink, Source, StreamJobStatus, Subscription, Table, View, }; -use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; +use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request, TableJobType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{Action, ActionWithGrantOption, Object}; use risingwave_pb::user::update_user_request::UpdateField; @@ -3412,7 +3412,7 @@ impl CatalogManager { /// This is used for `ALTER TABLE ADD/DROP COLUMN`. pub async fn start_replace_table_procedure(&self, stream_job: &StreamingJob) -> MetaResult<()> { - let StreamingJob::Table(source, table, ..) = stream_job else { + let StreamingJob::Table(source, table, job_type) = stream_job else { unreachable!("unexpected job: {stream_job:?}") }; let core = &mut *self.core.lock().await; @@ -3420,6 +3420,11 @@ impl CatalogManager { database_core.ensure_database_id(table.database_id)?; database_core.ensure_schema_id(table.schema_id)?; + // general table streaming job should not have dependent relations + if matches!(job_type, TableJobType::General) { + assert!(table.dependent_relations.is_empty()); + } + let key = (table.database_id, table.schema_id, table.name.clone()); let original_table = database_core .get_table(table.id) From 46d224b0d0f431678745d5fb4e9ba1fd86004234 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 5 Jul 2024 11:23:06 +0800 Subject: [PATCH 16/18] fix --- src/meta/src/rpc/ddl_controller.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 2cc765069c25d..f984894fa6a89 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -2001,6 +2001,11 @@ impl DdlController { .generate_graph(&self.env, stream_job, expr_context) .await?; + // general table job type does not have upstream job, so the dispatchers should be empty + if matches!(table_job_type, TableJobType::General) { + assert!(dispatchers.is_empty()); + } + // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. From 4d325acdb4728ec2197968b72a686e38f7ece54e Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 7 Jul 2024 17:36:19 +0800 Subject: [PATCH 17/18] fix alter table during cdc backfill --- .../src/executor/backfill/cdc/cdc_backfill.rs | 81 ++++++++++++++----- 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 4e564130accbb..59686f4bb8fdd 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -42,6 +42,7 @@ use crate::executor::backfill::utils::{ use crate::executor::backfill::CdcScanOptions; use crate::executor::monitor::CdcBackfillMetrics; use crate::executor::prelude::*; +use crate::executor::UpdateMutation; use crate::task::CreateMviewProgress; /// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each. @@ -135,7 +136,7 @@ impl CdcBackfillExecutor { let pk_indices = self.external_table.pk_indices().to_vec(); let pk_order = self.external_table.pk_order_types().to_vec(); - let upstream_table_id = self.external_table.table_id().table_id; + let table_id = self.external_table.table_id().table_id; let upstream_table_name = self.external_table.qualified_table_name(); let schema_table_name = self.external_table.schema_table_name().clone(); let external_database_name = self.external_table.database_name().to_owned(); @@ -157,7 +158,7 @@ impl CdcBackfillExecutor { // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; - let mut paused = first_barrier.is_pause_on_startup(); + let mut is_snapshot_paused = first_barrier.is_pause_on_startup(); // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. @@ -192,11 +193,12 @@ impl CdcBackfillExecutor { let mut consumed_binlog_offset: Option = None; tracing::info!( - upstream_table_id, + table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, is_finished = state.is_finished, + is_snapshot_paused, snapshot_row_count = total_snapshot_row_count, rate_limit = self.rate_limit_rps, disable_backfill = self.options.disable_backfill, @@ -234,6 +236,27 @@ impl CdcBackfillExecutor { for msg in upstream.by_ref() { match msg? { Message::Barrier(barrier) => { + match barrier.mutation.as_deref() { + Some(crate::executor::Mutation::Pause) => { + is_snapshot_paused = true; + tracing::info!( + table_id, + upstream_table_name, + "snapshot is paused by barrier" + ); + } + Some(crate::executor::Mutation::Resume) => { + is_snapshot_paused = false; + tracing::info!( + table_id, + upstream_table_name, + "snapshot is resumed by barrier" + ); + } + _ => { + // ignore other mutations + } + } // commit state just to bump the epoch of state table state_impl.commit_state(barrier.epoch).await?; yield Message::Barrier(barrier); @@ -248,10 +271,11 @@ impl CdcBackfillExecutor { } } - tracing::info!(upstream_table_id, + tracing::info!(table_id, upstream_table_name, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, + is_snapshot_paused, "start cdc backfill loop"); // the buffer will be drained when a barrier comes @@ -274,9 +298,9 @@ impl CdcBackfillExecutor { .snapshot_read_full_table(read_args, self.options.snapshot_batch_size) .map(Either::Right)); - let (right_snapshot, valve) = pausable(right_snapshot); - if paused { - valve.pause(); + let (right_snapshot, snapshot_valve) = pausable(right_snapshot); + if is_snapshot_paused { + snapshot_valve.pause(); } // Prefer to select upstream, so we can stop snapshot stream when barrier comes. @@ -306,12 +330,12 @@ impl CdcBackfillExecutor { use crate::executor::Mutation; match mutation { Mutation::Pause => { - paused = true; - valve.pause(); + is_snapshot_paused = true; + snapshot_valve.pause(); } Mutation::Resume => { - paused = false; - valve.resume(); + is_snapshot_paused = false; + snapshot_valve.resume(); } Mutation::Throttle(some) => { if let Some(new_rate_limit) = @@ -323,6 +347,21 @@ impl CdcBackfillExecutor { continue 'backfill_loop; } } + Mutation::Update(UpdateMutation { + dropped_actors, + .. + }) => { + if dropped_actors.contains(&self.actor_ctx.id) { + // the actor has been dropped, exit the backfill loop + tracing::info!( + table_id, + upstream_table_name, + "CdcBackfill has been dropped due to config change" + ); + yield Message::Barrier(barrier); + break 'backfill_loop; + } + } _ => (), } } @@ -339,7 +378,7 @@ impl CdcBackfillExecutor { // staging the barrier pending_barrier = Some(barrier); tracing::debug!( - upstream_table_id, + table_id, ?current_pk_pos, ?snapshot_read_row_cnt, "Prepare to start a new snapshot" @@ -406,7 +445,7 @@ impl CdcBackfillExecutor { match msg? { None => { tracing::info!( - upstream_table_id, + table_id, ?last_binlog_offset, ?current_pk_pos, "snapshot read stream ends" @@ -457,14 +496,20 @@ impl CdcBackfillExecutor { // Otherwise, the result set of the new snapshot stream may become empty. // It maybe a cancellation bug of the mysql driver. let (_, mut snapshot_stream) = backfill_stream.into_inner(); - if let Some(msg) = snapshot_stream.next().await { + + if !is_snapshot_paused + && let Some(msg) = snapshot_stream + .next() + .instrument_await("consume_snapshot_stream_once") + .await + { let Either::Right(msg) = msg else { bail!("BUG: snapshot_read contains upstream messages"); }; match msg? { None => { tracing::info!( - upstream_table_id, + table_id, ?last_binlog_offset, ?current_pk_pos, "snapshot read stream ends in the force emit branch" @@ -501,7 +546,7 @@ impl CdcBackfillExecutor { snapshot_read_row_cnt += row_count as usize; tracing::debug!( - upstream_table_id, + table_id, ?current_pk_pos, ?snapshot_read_row_cnt, "force emit a snapshot chunk" @@ -567,7 +612,7 @@ impl CdcBackfillExecutor { } else if self.options.disable_backfill { // If backfill is disabled, we just mark the backfill as finished tracing::info!( - upstream_table_id, + table_id, upstream_table_name, "CdcBackfill has been disabled" ); @@ -585,7 +630,7 @@ impl CdcBackfillExecutor { drop(upstream_table_reader); tracing::info!( - upstream_table_id, + table_id, upstream_table_name, "CdcBackfill has already finished and will forward messages directly to the downstream" ); From 3e56a60f97b288597e7b2323f172e862e8f86972 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 7 Jul 2024 18:00:34 +0800 Subject: [PATCH 18/18] minor --- e2e_test/source/cdc_inline/alter/cdc_table_alter.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index ed1110dc9ea01..6bea5dce2fe45 100644 --- a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -129,7 +129,7 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id; # alter mysql tables system ok -mysql -u root --protocol='tcp' -e " +mysql -e " USE testdb1; ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0; ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255);