diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index f09e47302f3c6..23f136c7cc032 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -19,19 +19,8 @@ Caused by these errors (recent errors listed first): 2: invalid IPv4 address -statement error +statement error Failed to run the query create function int_42() returns int as int_42 using link '55.55.55.55:5555'; ----- -db error: ERROR: Failed to run the query - -Caused by these errors (recent errors listed first): - 1: failed to check UDF signature - 2: failed to send requests to UDF service - 3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} } - 4: transport error - 5: error trying to connect - 6: tcp connect error - 7: deadline has elapsed statement error diff --git a/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt new file mode 100644 index 0000000000000..31bb9d1b0421b --- /dev/null +++ b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt @@ -0,0 +1,69 @@ +control substitution on + +system ok +mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;" + +system ok +mysql -e " + USE mytest; + DROP TABLE IF EXISTS customers; + CREATE TABLE customers( + id BIGINT PRIMARY KEY, + modified DATETIME, + custinfo JSON + ); + ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) ); + " + +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'root', + password = '${MYSQL_PWD:}', + database.name = 'mytest', + server.id = '5701', + auto.schema.change = 'true' +); + +statement ok +create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers'; + +# Name, Type, Is Hidden, Description +query TTTT +describe rw_customers; +---- +id bigint false NULL +modified timestamp without time zone false NULL +custinfo jsonb false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description rw_customers NULL NULL + + +system ok +mysql -e " + USE mytest; + ALTER TABLE customers ADD COLUMN v1 VARCHAR(255); + ALTER TABLE customers ADD COLUMN v2 double(5,2); +" + +sleep 3s + +# Name, Type, Is Hidden, Description +query TTTT +describe rw_customers; +---- +id bigint false NULL +modified timestamp without time zone false NULL +custinfo jsonb false NULL +v1 character varying false NULL +v2 double precision false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description rw_customers NULL NULL + + +statement ok +drop source mysql_source cascade; diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties index 0c62a51986b1c..b5b7b71882c08 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties @@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*} schema.history.internal.store.only.captured.tables.ddl=true schema.history.internal.store.only.captured.databases.ddl=true # default to disable schema change events -include.schema.changes=${debezium.include.schema.changes:-false} +include.schema.changes=${auto.schema.change:-false} database.server.id=${server.id} # default to use unencrypted connection database.ssl.mode=${ssl.mode:-disabled} diff --git a/proto/catalog.proto b/proto/catalog.proto index e13a99ac3c2f9..5894294290c21 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -411,6 +411,11 @@ message Table { // conflict" operations. optional uint32 version_column_index = 38; + // The unique identifier of the upstream table if it is a CDC table. + // It will be used in auto schema change to get the Table which mapped to the + // upstream table. + optional string cdc_table_id = 39; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/common.proto b/proto/common.proto index d6c596ec4c497..05d938cc26523 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -54,6 +54,8 @@ message WorkerNode { bool is_streaming = 1; bool is_serving = 2; bool is_unschedulable = 3; + // This is used for frontend node to register its rpc address + string internal_rpc_host_addr = 4; } message Resource { string rw_version = 1; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index f78c08e2a9b52..26212d659206a 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -454,7 +454,7 @@ message TableSchemaChange { } TableChangeType change_type = 1; - string cdc_table_name = 2; + string cdc_table_id = 2; repeated plan_common.ColumnCatalog columns = 3; } @@ -462,6 +462,12 @@ message SchemaChangeEnvelope { repeated TableSchemaChange table_changes = 1; } +message AutoSchemaChangeRequest { + SchemaChangeEnvelope schema_change = 1; +} + +message AutoSchemaChangeResponse {} + service DdlService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); @@ -500,4 +506,5 @@ service DdlService { rpc GetTables(GetTablesRequest) returns (GetTablesResponse); rpc Wait(WaitRequest) returns (WaitResponse); rpc CommentOn(CommentOnRequest) returns (CommentOnResponse); + rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse); } diff --git a/proto/frontend_service.proto b/proto/frontend_service.proto new file mode 100644 index 0000000000000..7cdac815fde66 --- /dev/null +++ b/proto/frontend_service.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package frontend_service; + +import "ddl_service.proto"; + +option java_package = "com.risingwave.proto"; +option optimize_for = SPEED; + +message GetTableReplacePlanRequest { + uint32 database_id = 1; + uint32 owner = 2; + string table_name = 3; + ddl_service.TableSchemaChange table_change = 4; +} + +message GetTableReplacePlanResponse { + ddl_service.ReplaceTablePlan replace_plan = 1; +} + +service FrontendService { + rpc GetTableReplacePlan(GetTableReplacePlanRequest) returns (GetTableReplacePlanResponse); +} diff --git a/proto/meta.proto b/proto/meta.proto index bcb6c331549f2..240c63cc90474 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -312,6 +312,8 @@ message AddWorkerNodeRequest { bool is_streaming = 2; bool is_serving = 3; bool is_unschedulable = 4; + // This is used for frontend node to register its rpc address + string internal_rpc_host_addr = 5; } common.WorkerType worker_type = 1; common.HostAddress host = 2; diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index e01ee9e4b7dee..c4862556ae826 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -174,6 +174,7 @@ impl SourceExecutor { rate_limit: None, }, ConnectorProperties::default(), + None, )); let stream = self .source diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index c4f4829c36110..80cd2806f2b64 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -430,6 +430,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: true, + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(1), ..Default::default() @@ -444,6 +445,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: false, + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(2), ..Default::default() diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 27c8c40203397..2d09461959d0b 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -505,7 +505,7 @@ mod test { ], ), prometheus_listener_addr: "127.0.0.1:1234", - health_check_listener_addr: "127.0.0.1:6786", + frontend_rpc_listener_addr: "127.0.0.1:6786", config_path: "src/config/test.toml", metrics_level: None, enable_barrier_read: None, diff --git a/src/common/src/config.rs b/src/common/src/config.rs index a5e176415b860..50e1ce853ebe6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1027,6 +1027,10 @@ pub struct StreamingDeveloperConfig { /// If not specified, the value of `server.connection_pool_size` will be used. #[serde(default = "default::developer::stream_exchange_connection_pool_size")] pub exchange_connection_pool_size: Option, + + /// A flag to allow disabling the auto schema change handling + #[serde(default = "default::developer::stream_enable_auto_schema_change")] + pub enable_auto_schema_change: bool, } /// The subsections `[batch.developer]`. @@ -1903,6 +1907,10 @@ pub mod default { pub fn enable_actor_tokio_metrics() -> bool { false } + + pub fn stream_enable_auto_schema_change() -> bool { + true + } } pub use crate::system_param::default as system; diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 328287708a8a2..5619ffc6e0f96 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -213,6 +213,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: false, + internal_rpc_host_addr: "".to_string(), }; let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| { @@ -231,7 +232,7 @@ mod tests { let worker_1 = WorkerNode { id: 1, parallelism: 1, - property: Some(serving_property), + property: Some(serving_property.clone()), ..Default::default() }; @@ -246,7 +247,7 @@ mod tests { let worker_2 = WorkerNode { id: 2, parallelism: 50, - property: Some(serving_property), + property: Some(serving_property.clone()), ..Default::default() }; @@ -265,7 +266,7 @@ mod tests { let worker_3 = WorkerNode { id: 3, parallelism: 60, - property: Some(serving_property), + property: Some(serving_property.clone()), ..Default::default() }; let re_pu_mapping_2 = place_vnode( diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 3270897c29d31..158cca95029af 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -127,6 +127,7 @@ pub async fn compute_node_serve( is_streaming: opts.role.for_streaming(), is_serving: opts.role.for_serving(), is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, &config.meta, ) diff --git a/src/config/example.toml b/src/config/example.toml index 73b49440e7ec0..e9076bf9bf084 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -124,6 +124,7 @@ stream_enable_arrangement_backfill = true stream_high_join_amplification_threshold = 2048 stream_enable_actor_tokio_metrics = false stream_exchange_connection_pool_size = 1 +stream_enable_auto_schema_change = true [storage] share_buffers_sync_parallelism = 1 diff --git a/src/connector/src/parser/debezium/schema_change.rs b/src/connector/src/parser/debezium/schema_change.rs index 4c61b52caaba9..c586257ae696b 100644 --- a/src/connector/src/parser/debezium/schema_change.rs +++ b/src/connector/src/parser/debezium/schema_change.rs @@ -65,12 +65,16 @@ impl From<&str> for TableChangeType { #[derive(Debug)] pub struct TableSchemaChange { - pub(crate) cdc_table_name: String, + pub(crate) cdc_table_id: String, pub(crate) columns: Vec, pub(crate) change_type: TableChangeType, } impl SchemaChangeEnvelope { + pub fn is_empty(&self) -> bool { + self.table_changes.is_empty() + } + pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope { let table_changes = self .table_changes @@ -83,7 +87,7 @@ impl SchemaChangeEnvelope { .collect(); PbTableSchemaChange { change_type: table_change.change_type.to_proto() as _, - cdc_table_name: table_change.cdc_table_name.clone(), + cdc_table_id: table_change.cdc_table_id.clone(), columns, } }) @@ -91,4 +95,11 @@ impl SchemaChangeEnvelope { PbSchemaChangeEnvelope { table_changes } } + + pub fn table_names(&self) -> Vec { + self.table_changes + .iter() + .map(|table_change| table_change.cdc_table_id.clone()) + .collect() + } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 6cd843c0384b2..4b14654bf518d 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -836,8 +836,26 @@ async fn into_chunk_stream_inner( } }, - Ok(ParseResult::SchemaChange(_)) => { - // TODO + Ok(ParseResult::SchemaChange(schema_change)) => { + if schema_change.is_empty() { + continue; + } + + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + // we bubble up the schema change event to the source executor via channel, + // and wait for the source executor to finish the schema change process before + // parsing the following messages. + if let Some(ref tx) = parser.source_ctx().schema_change_tx { + tx.send((schema_change, oneshot_tx)) + .await + .expect("send schema change to executor"); + match oneshot_rx.await { + Ok(()) => {} + Err(e) => { + tracing::error!(error = %e.as_report(), "failed to wait for schema change"); + } + } + } } } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 663fcb30e6ac9..d820eb0a26dba 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -508,7 +508,7 @@ mod tests { SchemaChangeEnvelope { table_changes: [ TableSchemaChange { - cdc_table_name: "mydb.test", + cdc_table_id: "mydb.test", columns: [ ColumnCatalog { column_desc: ColumnDesc { diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 5b9e6e1c1e720..a623aeb70bb61 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -211,7 +211,7 @@ pub fn parse_schema_change( } } schema_changes.push(TableSchemaChange { - cdc_table_name: id.replace('"', ""), // remove the double quotes + cdc_table_id: id.replace('"', ""), // remove the double quotes columns: column_descs .into_iter() .map(|column_desc| ColumnCatalog { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index dc00fee24200e..fd3fad8275154 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -32,6 +32,7 @@ use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use serde::de::DeserializeOwned; +use tokio::sync::mpsc; use super::cdc::DebeziumCdcMeta; use super::datagen::DatagenMeta; @@ -42,6 +43,7 @@ use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::error::ConnectorResult as Result; +use crate::parser::schema_change::SchemaChangeEnvelope; use crate::parser::ParserConfig; use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; @@ -178,6 +180,9 @@ pub struct SourceContext { pub metrics: Arc, pub source_ctrl_opts: SourceCtrlOpts, pub connector_props: ConnectorProperties, + // source parser put schema change event into this channel + pub schema_change_tx: + Option)>>, } impl SourceContext { @@ -189,6 +194,9 @@ impl SourceContext { metrics: Arc, source_ctrl_opts: SourceCtrlOpts, connector_props: ConnectorProperties, + schema_change_channel: Option< + mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>, + >, ) -> Self { Self { actor_id, @@ -198,6 +206,7 @@ impl SourceContext { metrics, source_ctrl_opts, connector_props, + schema_change_tx: schema_change_channel, } } @@ -215,6 +224,7 @@ impl SourceContext { rate_limit: None, }, ConnectorProperties::default(), + None, ) } } diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 6947ba7a46d6b..0e7ec02cfac27 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -145,6 +145,7 @@ pub fn type_name_to_mysql_type(ty_name: &str) -> Option { $( $name => Some(ColumnType::$variant(Default::default())), )* + "json" => Some(ColumnType::Json), _ => None, } }; diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 605dd47ecd0e3..c56749de9b9b8 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -45,6 +45,7 @@ pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size"; // We enable transaction for shared cdc source by default pub const CDC_TRANSACTIONAL_KEY: &str = "transactional"; pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout"; +pub const CDC_AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change"; pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index a6c488922022d..c491fc3bf44b2 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -154,7 +154,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an Worker::insert(worker::ActiveModel::from(&worker.worker_node)) .exec(&meta_store_sql.conn) .await?; - if worker.worker_type() == WorkerType::ComputeNode { + if worker.worker_type() == WorkerType::ComputeNode + || worker.worker_type() == WorkerType::Frontend + { let pb_property = worker.worker_node.property.as_ref().unwrap(); let property = worker_property::ActiveModel { worker_id: Set(worker.worker_id() as _), @@ -162,6 +164,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an is_serving: Set(pb_property.is_serving), is_unschedulable: Set(pb_property.is_unschedulable), parallelism: Set(worker.worker_node.parallelism() as _), + internal_rpc_host_addr: Set(pb_property.internal_rpc_host_addr.clone()), }; WorkerProperty::insert(property) .exec(&meta_store_sql.conn) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs index 6c21f524e684e..b50c7e4cfd07b 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs @@ -15,6 +15,7 @@ use itertools::Itertools; use risingwave_common::types::{Fields, Timestamptz}; use risingwave_frontend_macro::system_catalog; +use risingwave_pb::common::WorkerType; use crate::catalog::system_catalog::SysCatalogReaderImpl; use crate::error::Result; @@ -32,6 +33,7 @@ struct RwWorkerNode { is_streaming: Option, is_serving: Option, is_unschedulable: Option, + internal_rpc_host_addr: Option, rw_version: Option, system_total_memory_bytes: Option, system_total_cpu_cores: Option, @@ -49,6 +51,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result Result, pub initialized_at_cluster_version: Option, + + pub cdc_table_id: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -425,6 +427,7 @@ impl TableCatalog { created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), retention_seconds: self.retention_seconds, + cdc_table_id: self.cdc_table_id.clone(), } } @@ -598,6 +601,7 @@ impl From for TableCatalog { .into_iter() .map(TableId::from) .collect_vec(), + cdc_table_id: tb.cdc_table_id, } } } @@ -687,6 +691,7 @@ mod tests { created_at_cluster_version: None, initialized_at_cluster_version: None, version_column_index: None, + cdc_table_id: None, } .into(); @@ -749,6 +754,7 @@ mod tests { initialized_at_cluster_version: None, dependent_relations: vec![], version_column_index: None, + cdc_table_id: None, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index ba3ae95399783..1e11e390edfd8 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -12,20 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::Context; +use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::{bail, bail_not_implemented}; use risingwave_connector::sink::catalog::SinkCatalog; +use risingwave_pb::catalog::{Source, Table}; +use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, + AlterTableOperation, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Encode, + ObjectName, Statement, StructField, }; use risingwave_sqlparser::parser::Parser; @@ -48,6 +52,122 @@ pub async fn replace_table_with_definition( original_catalog: &Arc, source_schema: Option, ) -> Result<()> { + let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( + session, + table_name, + definition, + original_catalog, + source_schema, + ) + .await?; + + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping, job_type) + .await?; + Ok(()) +} + +pub async fn get_new_table_definition_for_cdc_table( + session: &Arc, + table_name: ObjectName, + new_columns: Vec, +) -> Result<(Statement, Arc)> { + let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; + + // Retrieve the original table definition and parse it to AST. + let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) + .context("unable to parse original table definition")? + .try_into() + .unwrap(); + let Statement::CreateTable { + columns: original_columns, + source_schema, + .. + } = &mut definition + else { + panic!("unexpected statement: {:?}", definition); + }; + + assert!( + source_schema.is_none(), + "source schema should be None for CDC table" + ); + + let orig_column_map: HashMap = HashMap::from_iter( + original_columns + .iter() + .map(|col| (col.name.real_value(), col.clone())), + ); + + // update the original columns with new version columns + let mut new_column_defs = vec![]; + for col in new_columns { + // if the column exists in the original definitoins, use the original column definition. + // since we don't support altering the column type right now + if let Some(original_col) = orig_column_map.get(col.name()) { + new_column_defs.push(original_col.clone()); + } else { + let ty = to_ast_data_type(col.data_type())?; + new_column_defs.push(ColumnDef::new(col.name().into(), ty, None, vec![])); + } + } + *original_columns = new_column_defs; + + Ok((definition, original_catalog)) +} + +fn to_ast_data_type(ty: &DataType) -> Result { + match ty { + DataType::Boolean => Ok(AstDataType::Boolean), + DataType::Int16 => Ok(AstDataType::SmallInt), + DataType::Int32 => Ok(AstDataType::Int), + DataType::Int64 => Ok(AstDataType::BigInt), + DataType::Float32 => Ok(AstDataType::Real), + DataType::Float64 => Ok(AstDataType::Double), + // TODO: handle precision and scale for decimal + DataType::Decimal => Ok(AstDataType::Decimal(None, None)), + DataType::Date => Ok(AstDataType::Date), + DataType::Varchar => Ok(AstDataType::Varchar), + DataType::Time => Ok(AstDataType::Time(false)), + DataType::Timestamp => Ok(AstDataType::Timestamp(false)), + DataType::Timestamptz => Ok(AstDataType::Timestamp(true)), + DataType::Interval => Ok(AstDataType::Interval), + DataType::Jsonb => Ok(AstDataType::Jsonb), + DataType::Bytea => Ok(AstDataType::Bytea), + DataType::List(item_ty) => Ok(AstDataType::Array(Box::new(to_ast_data_type(item_ty)?))), + DataType::Struct(fields) => { + let fields = fields + .iter() + .map(|(name, ty)| { + Ok::(StructField { + name: name.into(), + data_type: to_ast_data_type(ty)?, + }) + }) + .try_collect()?; + Ok(AstDataType::Struct(fields)) + } + DataType::Serial | DataType::Int256 | DataType::Map(_) => { + Err(anyhow!("unsupported data type: {:?}", ty).context("to_ast_data_type"))? + } + } +} + +pub async fn get_replace_table_plan( + session: &Arc, + table_name: ObjectName, + definition: Statement, + original_catalog: &Arc, + source_schema: Option, +) -> Result<( + Option, + Table, + StreamFragmentGraph, + ColIndexMapping, + TableJobType, +)> { // Create handler args as if we're creating a new table with the altered definition. let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); @@ -117,12 +237,7 @@ pub async fn replace_table_with_definition( table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); - let catalog_writer = session.catalog_writer()?; - - catalog_writer - .replace_table(source, table, graph, col_index_mapping, job_type) - .await?; - Ok(()) + Ok((source, table, graph, col_index_mapping, job_type)) } pub(crate) fn hijack_merger_for_target_table( diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c7a264816a6a..c4048dd7cac01 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -44,9 +44,9 @@ use risingwave_connector::schema::schema_registry::{ use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::{ - CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, - CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, - MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR, + CDC_AUTO_SCHEMA_CHANGE_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, + CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, + MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; @@ -1414,6 +1414,16 @@ pub fn bind_connector_props( )))); } if is_create_source && create_cdc_source_job { + if let Some(value) = with_properties.get(CDC_AUTO_SCHEMA_CHANGE_KEY) + && value + .parse::() + .map_err(|_| anyhow!("invalid value of '{}' option", CDC_AUTO_SCHEMA_CHANGE_KEY))? + { + Feature::CdcAutoSchemaChange + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + } + // set connector to backfill mode with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into()); // enable cdc sharing mode, which will capture all tables in the given `database.name` diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6e57ab52c87fe..3fcfca881508d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -734,6 +734,7 @@ fn gen_table_plan_inner( version, is_external_source, retention_seconds, + None, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -817,7 +818,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( let options = CdcScanOptions::from_with_options(context.with_options())?; let logical_scan = LogicalCdcScan::create( - external_table_name, + external_table_name.clone(), Rc::new(cdc_table_desc), context.clone(), options, @@ -847,6 +848,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( Some(col_id_gen.into_version()), true, None, + Some(external_table_name), )?; let mut table = materialize.table().to_prost(schema_id, database_id); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9cacbdcb8ce34..42de3a116d0b6 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -100,6 +100,8 @@ pub mod util; pub mod variable; mod wait; +pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan}; + /// The [`PgResponseBuilder`] used by RisingWave. pub type RwPgResponseBuilder = PgResponseBuilder; diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 4bdf9fa398f77..003700feb6763 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -73,6 +73,7 @@ mod user; pub mod health_service; mod monitor; +pub mod rpc; mod telemetry; use std::ffi::OsString; @@ -118,10 +119,11 @@ pub struct FrontendOpts { #[clap( long, + alias = "health-check-listener-addr", env = "RW_HEALTH_CHECK_LISTENER_ADDR", default_value = "127.0.0.1:6786" )] - pub health_check_listener_addr: String, + pub frontend_rpc_listener_addr: String, /// The path of `risingwave.toml` configuration file. /// @@ -174,6 +176,8 @@ use std::pin::Pin; use pgwire::pg_protocol::TlsConfig; +use crate::session::SESSION_MANAGER; + /// Start frontend pub fn start( opts: FrontendOpts, @@ -183,7 +187,8 @@ pub fn start( // slow compile in release mode. Box::pin(async move { let listen_addr = opts.listen_addr.clone(); - let session_mgr = SessionManagerImpl::new(opts).await.unwrap(); + let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap()); + SESSION_MANAGER.get_or_init(|| session_mgr.clone()); let redact_sql_option_keywords = Arc::new( session_mgr .env() @@ -196,7 +201,7 @@ pub fn start( pg_serve( &listen_addr, - session_mgr, + session_mgr.clone(), TlsConfig::new_default(), Some(redact_sql_option_keywords), shutdown, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index b824e052e50bf..e263cc36b6a99 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -637,6 +637,7 @@ impl PlanRoot { version: Option, with_external_source: bool, retention_seconds: Option, + cdc_table_name: Option, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -867,6 +868,7 @@ impl PlanRoot { row_id_index, version, retention_seconds, + cdc_table_name, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 865dc71191b46..01474532ebb5c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -133,10 +133,11 @@ impl StreamMaterialize { row_id_index: Option, version: Option, retention_seconds: Option, + cdc_table_name: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; - let table = Self::derive_table_catalog( + let mut table = Self::derive_table_catalog( input.clone(), name, user_order_by, @@ -153,6 +154,8 @@ impl StreamMaterialize { CreateType::Foreground, )?; + table.cdc_table_id = cdc_table_name; + Ok(Self::new(input, table)) } @@ -279,6 +282,7 @@ impl StreamMaterialize { initialized_at_cluster_version: None, created_at_cluster_version: None, retention_seconds: retention_seconds.map(|i| i.into()), + cdc_table_id: None, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 2ac317d597bad..bc3c223c615e6 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -178,6 +178,7 @@ impl TableCatalogBuilder { initialized_at_cluster_version: None, created_at_cluster_version: None, retention_seconds: None, + cdc_table_id: None, } } diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs new file mode 100644 index 0000000000000..257695cc99e48 --- /dev/null +++ b/src/frontend/src/rpc/mod.rs @@ -0,0 +1,113 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_server::{BoxedError, SessionManager}; +use risingwave_pb::ddl_service::{ReplaceTablePlan, TableSchemaChange}; +use risingwave_pb::frontend_service::frontend_service_server::FrontendService; +use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse}; +use risingwave_rpc_client::error::ToTonicStatus; +use risingwave_sqlparser::ast::ObjectName; +use tonic::{Request as RpcRequest, Response as RpcResponse, Status}; + +use crate::error::RwError; +use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan}; +use crate::session::SESSION_MANAGER; + +#[derive(thiserror::Error, Debug)] +pub enum AutoSchemaChangeError { + #[error("frontend error")] + FrontendError( + #[from] + #[backtrace] + RwError, + ), +} + +impl From for AutoSchemaChangeError { + fn from(err: BoxedError) -> Self { + AutoSchemaChangeError::FrontendError(RwError::from(err)) + } +} + +impl From for tonic::Status { + fn from(err: AutoSchemaChangeError) -> Self { + err.to_status(tonic::Code::Internal, "frontend") + } +} + +#[derive(Default)] +pub struct FrontendServiceImpl {} + +impl FrontendServiceImpl { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl FrontendService for FrontendServiceImpl { + async fn get_table_replace_plan( + &self, + request: RpcRequest, + ) -> Result, Status> { + let req = request.into_inner(); + tracing::info!("get_table_replace_plan for table {}", req.table_name); + + let table_change = req.table_change.expect("schema change message is required"); + let replace_plan = + get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?; + + Ok(RpcResponse::new(GetTableReplacePlanResponse { + replace_plan: Some(replace_plan), + })) + } +} + +/// Get the new table plan for the given table schema change +async fn get_new_table_plan( + table_change: TableSchemaChange, + table_name: String, + database_id: u32, + owner: u32, +) -> Result { + tracing::info!("get_new_table_plan for table {}", table_name); + + let session_mgr = SESSION_MANAGER + .get() + .expect("session manager has been initialized"); + + // get a session object for the corresponding user and database + let session = session_mgr.create_dummy_session(database_id, owner)?; + + let new_columns = table_change.columns.into_iter().map(|c| c.into()).collect(); + let table_name = ObjectName::from(vec![table_name.as_str().into()]); + let (new_table_definition, original_catalog) = + get_new_table_definition_for_cdc_table(&session, table_name.clone(), new_columns).await?; + let (_, table, graph, col_index_mapping, job_type) = get_replace_table_plan( + &session, + table_name, + new_table_definition, + &original_catalog, + None, + ) + .await?; + + Ok(ReplaceTablePlan { + table: Some(table), + fragment_graph: Some(graph), + table_col_index_mapping: Some(col_index_mapping.to_protobuf()), + source: None, // none for cdc table + job_type: job_type as _, + }) +} diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index ce13960ab221f..b991d86eca2b7 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -596,6 +596,7 @@ pub(crate) mod tests { incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, + cdc_table_id: None, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), @@ -680,6 +681,7 @@ pub(crate) mod tests { is_unschedulable: false, is_serving: true, is_streaming: true, + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(0), ..Default::default() @@ -697,6 +699,7 @@ pub(crate) mod tests { is_unschedulable: false, is_serving: true, is_streaming: true, + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(1), ..Default::default() @@ -714,6 +717,7 @@ pub(crate) mod tests { is_unschedulable: false, is_serving: true, is_streaming: true, + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(2), ..Default::default() diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index b970899ef080f..374c045c5bc27 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::io::{Error, ErrorKind}; -#[cfg(test)] use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Weak}; @@ -68,7 +67,9 @@ use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager}; use risingwave_connector::source::monitor::{SourceMetrics, GLOBAL_SOURCE_METRICS}; use risingwave_pb::common::WorkerType; +use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer; use risingwave_pb::health::health_server::HealthServer; +use risingwave_pb::meta::add_worker_node_request::Property as AddWorkerNodeProperty; use risingwave_pb::user::auth_info::EncryptionType; use risingwave_pb::user::grant_privilege::Object; use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient}; @@ -80,6 +81,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::info; +use tracing::log::error; use self::cursor_manager::CursorManager; use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError}; @@ -105,6 +107,7 @@ use crate::health_service::HealthServiceImpl; use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl}; use crate::monitor::{FrontendMetrics, GLOBAL_FRONTEND_METRICS}; use crate::observer::FrontendObserverNode; +use crate::rpc::FrontendServiceImpl; use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef}; use crate::scheduler::{ DistributedQueryMetrics, HummockSnapshotManager, HummockSnapshotManagerRef, QueryManager, @@ -259,12 +262,21 @@ impl FrontendEnv { .unwrap(); info!("advertise addr is {}", frontend_address); + let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap(); + let internal_rpc_host_addr = HostAddr { + // Use the host of advertise address for the frontend rpc address. + host: frontend_address.host.clone(), + port: rpc_addr.port, + }; // Register in meta by calling `AddWorkerNode` RPC. let (meta_client, system_params_reader) = MetaClient::register_new( opts.meta_addr, WorkerType::Frontend, &frontend_address, - Default::default(), + AddWorkerNodeProperty { + internal_rpc_host_addr: internal_rpc_host_addr.to_string(), + ..Default::default() + }, &config.meta, ) .await?; @@ -352,7 +364,8 @@ impl FrontendEnv { } let health_srv = HealthServiceImpl::new(); - let host = opts.health_check_listener_addr.clone(); + let frontend_srv = FrontendServiceImpl::new(); + let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap(); let telemetry_manager = TelemetryManager::new( Arc::new(meta_client.clone()), @@ -372,13 +385,14 @@ impl FrontendEnv { tokio::spawn(async move { tonic::transport::Server::builder() .add_service(HealthServer::new(health_srv)) - .serve(host.parse().unwrap()) + .add_service(FrontendServiceServer::new(frontend_srv)) + .serve(frontend_rpc_addr) .await .unwrap(); }); info!( "Health Check RPC Listener is set up on {}", - opts.health_check_listener_addr.clone() + opts.frontend_rpc_listener_addr.clone() ); let creating_streaming_job_tracker = @@ -1118,6 +1132,9 @@ impl SessionImpl { } } +pub static SESSION_MANAGER: std::sync::OnceLock> = + std::sync::OnceLock::new(); + pub struct SessionManagerImpl { env: FrontendEnv, _join_handles: Vec>, @@ -1128,6 +1145,27 @@ pub struct SessionManagerImpl { impl SessionManager for SessionManagerImpl { type Session = SessionImpl; + fn create_dummy_session( + &self, + database_id: u32, + user_id: u32, + ) -> std::result::Result, BoxedError> { + let dummy_addr = Address::Tcp(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + 5691, // port of meta + )); + let user_reader = self.env.user_info_reader(); + let reader = user_reader.read_guard(); + if let Some(user_name) = reader.get_user_name_by_id(user_id) { + self.connect_inner(database_id, user_name.as_str(), Arc::new(dummy_addr)) + } else { + Err(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Role id {} does not exist", user_id), + ))) + } + } + fn connect( &self, database: &str, @@ -1145,6 +1183,89 @@ impl SessionManager for SessionManagerImpl { )) })? .id(); + + self.connect_inner(database_id, user_name, peer_addr) + } + + /// Used when cancel request happened. + fn cancel_queries_in_session(&self, session_id: SessionId) { + self.env.cancel_queries_in_session(session_id); + } + + fn cancel_creating_jobs_in_session(&self, session_id: SessionId) { + self.env.cancel_creating_jobs_in_session(session_id); + } + + fn end_session(&self, session: &Self::Session) { + self.delete_session(&session.session_id()); + } + + async fn shutdown(&self) { + // Clean up the session map. + self.env.sessions_map().write().clear(); + // Unregister from the meta service. + self.env.meta_client().try_unregister().await; + } +} + +impl SessionManagerImpl { + pub async fn new(opts: FrontendOpts) -> Result { + // TODO(shutdown): only save join handles that **need** to be shutdown + let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?; + Ok(Self { + env, + _join_handles: join_handles, + _shutdown_senders: shutdown_senders, + number: AtomicI32::new(0), + }) + } + + pub fn env(&self) -> &FrontendEnv { + &self.env + } + + fn insert_session(&self, session: Arc) { + let active_sessions = { + let mut write_guard = self.env.sessions_map.write(); + write_guard.insert(session.id(), session); + write_guard.len() + }; + self.env + .frontend_metrics + .active_sessions + .set(active_sessions as i64); + } + + fn delete_session(&self, session_id: &SessionId) { + let active_sessions = { + let mut write_guard = self.env.sessions_map.write(); + write_guard.remove(session_id); + write_guard.len() + }; + self.env + .frontend_metrics + .active_sessions + .set(active_sessions as i64); + } + + fn connect_inner( + &self, + database_id: u32, + user_name: &str, + peer_addr: AddressRef, + ) -> std::result::Result, BoxedError> { + let catalog_reader = self.env.catalog_reader(); + let reader = catalog_reader.read_guard(); + let database_name = reader + .get_database_by_id(&database_id) + .map_err(|_| { + Box::new(Error::new( + ErrorKind::InvalidInput, + format!("database \"{}\" does not exist", database_id), + )) + })? + .name(); + let user_reader = self.env.user_info_reader(); let reader = user_reader.read_guard(); if let Some(user) = reader.get_user_by_name(user_name) { @@ -1199,7 +1320,7 @@ impl SessionManager for SessionManagerImpl { let session_impl: Arc = SessionImpl::new( self.env.clone(), Arc::new(AuthContext::new( - database.to_string(), + database_name.to_string(), user_name.to_string(), user.id, )), @@ -1219,67 +1340,6 @@ impl SessionManager for SessionManagerImpl { ))) } } - - /// Used when cancel request happened. - fn cancel_queries_in_session(&self, session_id: SessionId) { - self.env.cancel_queries_in_session(session_id); - } - - fn cancel_creating_jobs_in_session(&self, session_id: SessionId) { - self.env.cancel_creating_jobs_in_session(session_id); - } - - fn end_session(&self, session: &Self::Session) { - self.delete_session(&session.session_id()); - } - - async fn shutdown(&self) { - // Clean up the session map. - self.env.sessions_map().write().clear(); - // Unregister from the meta service. - self.env.meta_client().try_unregister().await; - } -} - -impl SessionManagerImpl { - pub async fn new(opts: FrontendOpts) -> Result { - // TODO(shutdown): only save join handles that **need** to be shutdown - let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?; - Ok(Self { - env, - _join_handles: join_handles, - _shutdown_senders: shutdown_senders, - number: AtomicI32::new(0), - }) - } - - pub fn env(&self) -> &FrontendEnv { - &self.env - } - - fn insert_session(&self, session: Arc) { - let active_sessions = { - let mut write_guard = self.env.sessions_map.write(); - write_guard.insert(session.id(), session); - write_guard.len() - }; - self.env - .frontend_metrics - .active_sessions - .set(active_sessions as i64); - } - - fn delete_session(&self, session_id: &SessionId) { - let active_sessions = { - let mut write_guard = self.env.sessions_map.write(); - write_guard.remove(session_id); - write_guard.len() - }; - self.env - .frontend_metrics - .active_sessions - .set(active_sessions as i64); - } } impl Session for SessionImpl { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24453f766c72c..086d4ff7de251 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -85,6 +85,14 @@ pub struct LocalFrontend { impl SessionManager for LocalFrontend { type Session = SessionImpl; + fn create_dummy_session( + &self, + _database_id: u32, + _user_name: u32, + ) -> std::result::Result, BoxedError> { + unreachable!() + } + fn connect( &self, _database: &str, diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs index e434e3709ac26..8a02cee72875a 100644 --- a/src/license/src/feature.rs +++ b/src/license/src/feature.rs @@ -55,6 +55,7 @@ macro_rules! for_all_features { { CdcTableSchemaMap, Paid, "Automatically map upstream schema to CDC Table."}, { SqlServerSink, Paid, "Sink data from RisingWave to SQL Server." }, { SqlServerCdcSource, Paid, "CDC source connector for Sql Server." }, + { CdcAutoSchemaChange, Paid, "Auto replicate upstream DDL to CDC Table." }, } }; } diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index dc1f3dd3f9808..08291e5b163d5 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -18,6 +18,7 @@ mod m20240630_131430_remove_parallel_unit; mod m20240701_060504_hummock_time_travel; mod m20240702_080451_system_param_value; mod m20240702_084927_unnecessary_fk; +mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; pub struct Migrator; @@ -42,6 +43,7 @@ impl MigratorTrait for Migrator { Box::new(m20240701_060504_hummock_time_travel::Migration), Box::new(m20240702_080451_system_param_value::Migration), Box::new(m20240702_084927_unnecessary_fk::Migration), + Box::new(m20240726_063833_auto_schema_change::Migration), Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration), ] } diff --git a/src/meta/model_v2/migration/src/m20240726_063833_auto_schema_change.rs b/src/meta/model_v2/migration/src/m20240726_063833_auto_schema_change.rs new file mode 100644 index 0000000000000..eb5f32ae63b45 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240726_063833_auto_schema_change.rs @@ -0,0 +1,59 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .add_column(ColumnDef::new(Table::CdcTableId).string()) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(WorkerProperty::Table) + .add_column(ColumnDef::new(WorkerProperty::InternalRpcHostAddr).string()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .drop_column(Table::CdcTableId) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(WorkerProperty::Table) + .drop_column(WorkerProperty::InternalRpcHostAddr) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Table { + Table, + CdcTableId, +} + +#[derive(DeriveIden)] +enum WorkerProperty { + Table, + InternalRpcHostAddr, +} diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 75fb66fcb2a5e..3722658e7294f 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -132,6 +132,7 @@ pub struct Model { pub version: Option, pub retention_seconds: Option, pub incoming_sinks: I32Array, + pub cdc_table_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -243,6 +244,7 @@ impl From for ActiveModel { version: Set(pb_table.version.as_ref().map(|v| v.into())), retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), + cdc_table_id: Set(pb_table.cdc_table_id), } } } diff --git a/src/meta/model_v2/src/worker_property.rs b/src/meta/model_v2/src/worker_property.rs index 64834ae0b13cb..204882646d815 100644 --- a/src/meta/model_v2/src/worker_property.rs +++ b/src/meta/model_v2/src/worker_property.rs @@ -26,6 +26,7 @@ pub struct Model { pub is_streaming: bool, pub is_serving: bool, pub is_unschedulable: bool, + pub internal_rpc_host_addr: String, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index a3ee394e8f18f..bb9e041d716b2 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -16,6 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use rand::seq::SliceRandom; +use rand::thread_rng; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::MetadataManager; @@ -25,10 +27,13 @@ use risingwave_pb::catalog::connection::private_link_service::{ }; use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret}; +use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret, Table}; +use risingwave_pb::common::worker_node::State; +use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; +use risingwave_pb::frontend_service::GetTableReplacePlanRequest; use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; @@ -916,6 +921,79 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(AlterParallelismResponse {})) } + + /// Auto schema change for cdc sources, + /// called by the source parser when a schema change is detected. + async fn auto_schema_change( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + // randomly select a frontend worker to get the replace table plan + let workers = self + .metadata_manager + .list_worker_node(Some(WorkerType::Frontend), Some(State::Running)) + .await?; + let worker = workers + .choose(&mut thread_rng()) + .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?; + + let client = self + .env + .frontend_client_pool() + .get(worker) + .await + .map_err(MetaError::from)?; + + let Some(schema_change) = req.schema_change else { + return Err(Status::invalid_argument( + "schema change message is required", + )); + }; + + for table_change in schema_change.table_changes { + let cdc_table_id = table_change.cdc_table_id.clone(); + // get the table catalog corresponding to the cdc table + let tables: Vec = self + .metadata_manager + .get_table_catalog_by_cdc_table_id(cdc_table_id) + .await?; + + tracing::info!( + "(auto schema change) Table jobs to replace: {:?}", + tables.iter().map(|t| t.id).collect::>() + ); + for table in tables { + // send a request to the frontend to get the ReplaceTablePlan + // will retry with exponential backoff if the request fails + let resp = client + .get_table_replace_plan(GetTableReplacePlanRequest { + database_id: table.database_id, + owner: table.owner, + table_name: table.name, + table_change: Some(table_change.clone()), + }) + .await? + .into_inner(); + + if let Some(plan) = resp.replace_plan { + plan.table + .as_ref() + .inspect(|t| tracing::info!("Table job to replace: {}", t.id)); + // start the schema change procedure + self.ddl_controller + .run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info( + plan, + ))) + .await?; + tracing::info!("(auto schema change) Table {} replaced success", table.id); + } + } + } + + Ok(Response::new(AutoSchemaChangeResponse {})) + } } impl DdlServiceImpl { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e349fadf48373..8ffe3b1ac0782 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -806,7 +806,7 @@ impl GlobalBarrierManager { r#type: node.r#type, host: node.host.clone(), parallelism: node.parallelism, - property: node.property, + property: node.property.clone(), resource: node.resource.clone(), ..Default::default() }, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 387c5ea6f3c95..5698a96fb3a58 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2877,6 +2877,22 @@ impl CatalogController { .collect()) } + pub async fn get_table_by_cdc_table_id( + &self, + cdc_table_id: String, + ) -> MetaResult> { + let inner = self.inner.read().await; + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::CdcTableId.eq(cdc_table_id)) + .all(&inner.db) + .await?; + Ok(table_objs + .into_iter() + .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) + .collect()) + } + pub async fn get_created_table_ids(&self) -> MetaResult> { let inner = self.inner.read().await; diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 6a7ca826f160a..9fe1451a3eebc 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -84,6 +84,7 @@ impl From for PbWorkerNode { is_streaming: p.is_streaming, is_serving: p.is_serving, is_unschedulable: p.is_unschedulable, + internal_rpc_host_addr: p.internal_rpc_host_addr.clone(), }), transactional_id: info.0.transaction_id.map(|id| id as _), resource: info.2.resource, @@ -670,7 +671,7 @@ impl ClusterControllerInner { }; let insert_res = Worker::insert(worker).exec(&txn).await?; let worker_id = insert_res.last_insert_id as WorkerId; - if r#type == PbWorkerType::ComputeNode { + if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend { let property = worker_property::ActiveModel { worker_id: Set(worker_id), parallelism: Set(add_property @@ -680,6 +681,7 @@ impl ClusterControllerInner { is_streaming: Set(add_property.is_streaming), is_serving: Set(add_property.is_serving), is_unschedulable: Set(add_property.is_unschedulable), + internal_rpc_host_addr: Set(add_property.internal_rpc_host_addr), }; WorkerProperty::insert(property).exec(&txn).await?; } @@ -926,6 +928,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }; let hosts = mock_worker_hosts_for_test(worker_count); let mut worker_ids = vec![]; @@ -935,7 +938,7 @@ mod tests { .add_worker( PbWorkerType::ComputeNode, host.clone(), - property, + property.clone(), PbResource::default(), ) .await?, @@ -967,7 +970,7 @@ mod tests { ); // re-register existing worker node with larger parallelism and change its serving mode. - let mut new_property = property; + let mut new_property = property.clone(); new_property.worker_node_parallelism = (parallelism_num * 2) as _; new_property.is_serving = false; cluster_ctl @@ -1016,12 +1019,13 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }; let worker_id = cluster_ctl .add_worker( PbWorkerType::ComputeNode, host.clone(), - property, + property.clone(), PbResource::default(), ) .await?; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 6b56113aa38a1..c1881ab1a65af 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -162,6 +162,7 @@ impl From> for PbTable { initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), + cdc_table_id: value.0.cdc_table_id, } } } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 5fbec419cc7fe..2d25f196a60bf 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -405,6 +405,7 @@ async fn test_release_context_resource() { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -487,6 +488,7 @@ async fn test_hummock_manager_basic() { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 9a764f871cd63..886af7bddc622 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -352,6 +352,7 @@ pub async fn setup_compute_env_with_metric( is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index da1238d43d54b..e39b41cbaff0d 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -248,6 +248,15 @@ impl DatabaseManager { .collect() } + pub fn get_table_by_cdc_table_id(&self, cdc_table_id: String) -> Vec
{ + let cdc_table_id = Some(cdc_table_id); + self.tables + .values() + .filter(|t| t.cdc_table_id == cdc_table_id) + .cloned() + .collect() + } + pub fn check_relation_name_duplicated(&self, relation_key: &RelationKey) -> MetaResult<()> { if let Some(t) = self.tables.values().find(|x| { x.database_id == relation_key.0 diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 25c96dd4ec630..29d7cd754dd4f 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -4119,6 +4119,14 @@ impl CatalogManager { .get_table_name_and_type_mapping() } + pub async fn get_table_by_cdc_table_id(&self, cdc_table_id: String) -> Vec
{ + self.core + .lock() + .await + .database + .get_table_by_cdc_table_id(cdc_table_id) + } + /// `list_stream_job_ids` returns all running and creating stream job ids, this is for recovery /// clean up progress. pub async fn list_stream_job_ids(&self) -> MetaResult> { diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 93e50dec3706b..4cae782060b42 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -491,11 +491,12 @@ impl ClusterManager { worker_type: WorkerType, worker_property: AddNodeProperty, ) -> Option { - if worker_type == WorkerType::ComputeNode { + if worker_type == WorkerType::ComputeNode || worker_type == WorkerType::Frontend { Some(Property { is_streaming: worker_property.is_streaming, is_serving: worker_property.is_serving, is_unschedulable: worker_property.is_unschedulable, + internal_rpc_host_addr: worker_property.internal_rpc_host_addr, }) } else { None @@ -807,6 +808,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -848,6 +850,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -870,6 +873,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -919,6 +923,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -977,6 +982,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 22c7df751ec78..73a514519c3aa 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -23,7 +23,9 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; -use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; +use risingwave_rpc_client::{ + FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef, +}; use sea_orm::EntityTrait; use super::{ @@ -123,6 +125,9 @@ pub struct MetaSrvEnv { /// stream client pool memorization. stream_client_pool: StreamClientPoolRef, + /// rpc client pool for frontend nodes. + frontend_client_pool: FrontendClientPoolRef, + /// idle status manager. idle_manager: IdleManagerRef, @@ -385,6 +390,7 @@ impl MetaSrvEnv { ) -> MetaResult { let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); let stream_client_pool = Arc::new(StreamClientPool::new(1)); // typically no need for plural clients + let frontend_client_pool = Arc::new(FrontendClientPool::new(1)); let event_log_manager = Arc::new(start_event_log_manager( opts.event_log_enabled, opts.event_log_channel_max_size, @@ -440,6 +446,7 @@ impl MetaSrvEnv { meta_store_impl: meta_store_impl.clone(), notification_manager, stream_client_pool, + frontend_client_pool, idle_manager, event_log_manager, cluster_id, @@ -495,6 +502,7 @@ impl MetaSrvEnv { meta_store_impl: meta_store_impl.clone(), notification_manager, stream_client_pool, + frontend_client_pool, idle_manager, event_log_manager, cluster_id, @@ -559,6 +567,10 @@ impl MetaSrvEnv { self.stream_client_pool.deref() } + pub fn frontend_client_pool(&self) -> &FrontendClientPool { + self.frontend_client_pool.deref() + } + pub fn cluster_id(&self) -> &ClusterId { &self.cluster_id } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index bcf89b5eedb1c..3622e5e7c8504 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -565,6 +565,23 @@ impl MetadataManager { } } + pub async fn get_table_catalog_by_cdc_table_id( + &self, + cdc_table_id: String, + ) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr + .catalog_manager + .get_table_by_cdc_table_id(cdc_table_id) + .await), + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_table_by_cdc_table_id(cdc_table_id) + .await + } + } + } + pub async fn get_downstream_chain_fragments( &self, job_id: u32, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 880a1b1f334fb..220f69bc0a58d 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1038,6 +1038,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/prost/build.rs b/src/prost/build.rs index e9974d6d87a09..0682a63a02edb 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -54,6 +54,7 @@ fn main() -> Result<(), Box> { "telemetry", "user", "secret", + "frontend_service", ]; let protos: Vec = proto_files .iter() diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 0b416af39b838..4c4327d049446 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -85,6 +85,9 @@ pub mod monitor_service; #[cfg_attr(madsim, path = "sim/backup_service.rs")] pub mod backup_service; #[rustfmt::skip] +#[cfg_attr(madsim, path = "sim/frontend_service.rs")] +pub mod frontend_service; +#[rustfmt::skip] #[cfg_attr(madsim, path = "sim/java_binding.rs")] pub mod java_binding; #[rustfmt::skip] diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index c5c5613a32a4b..7ffab0736179e 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -76,4 +76,4 @@ macro_rules! impl_from_status { }; } -impl_from_status!(stream, batch, meta, compute, compactor, connector); +impl_from_status!(stream, batch, meta, compute, compactor, connector, frontend); diff --git a/src/rpc_client/src/frontend_client.rs b/src/rpc_client/src/frontend_client.rs new file mode 100644 index 0000000000000..3fd02d99107e2 --- /dev/null +++ b/src/rpc_client/src/frontend_client.rs @@ -0,0 +1,118 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; +use risingwave_common::monitor::{EndpointExt, TcpConfig}; +use risingwave_common::util::addr::HostAddr; +use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient; +use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse}; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tonic::transport::Endpoint; +use tonic::Response; + +use crate::error::Result; +use crate::tracing::{Channel, TracingInjectedChannelExt}; +use crate::{RpcClient, RpcClientPool}; + +const DEFAULT_RETRY_INTERVAL: u64 = 50; +const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5); +const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10; + +#[derive(Clone)] +struct FrontendClient(FrontendServiceClient); + +impl FrontendClient { + async fn new(host_addr: HostAddr) -> Result { + let channel = Endpoint::from_shared(format!("http://{}", &host_addr))? + .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) + .connect_timeout(Duration::from_secs(5)) + .monitored_connect( + "grpc-frontend-client", + TcpConfig { + tcp_nodelay: true, + keepalive_duration: None, + }, + ) + .await? + .tracing_injected(); + + Ok(Self( + FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX), + )) + } +} + +// similar to the stream_client used in the Meta node +pub type FrontendClientPool = RpcClientPool; +pub type FrontendClientPoolRef = Arc; + +#[async_trait] +impl RpcClient for FrontendRetryClient { + async fn new_client(host_addr: HostAddr) -> Result { + Self::new(host_addr).await + } +} + +#[derive(Clone)] +pub struct FrontendRetryClient { + client: FrontendClient, +} + +impl FrontendRetryClient { + async fn new(host_addr: HostAddr) -> Result { + let client = FrontendClient::new(host_addr).await?; + Ok(Self { client }) + } + + #[inline(always)] + fn get_retry_strategy() -> impl Iterator { + ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL) + .max_delay(DEFAULT_RETRY_MAX_DELAY) + .take(DEFAULT_RETRY_MAX_ATTEMPTS) + .map(jitter) + } + + fn should_retry(status: &tonic::Status) -> bool { + if status.code() == tonic::Code::Unavailable + || status.code() == tonic::Code::Unknown + || status.code() == tonic::Code::Unauthenticated + || status.code() == tonic::Code::Aborted + { + return true; + } + false + } + + pub async fn get_table_replace_plan( + &self, + request: GetTableReplacePlanRequest, + ) -> std::result::Result, tonic::Status> { + tokio_retry::RetryIf::spawn( + Self::get_retry_strategy(), + || async { + self.client + .to_owned() + .0 + .get_table_replace_plan(request.clone()) + .await + }, + Self::should_retry, + ) + .await + } +} diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index bb1d90dcffbf4..60a4ca537d21c 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -32,6 +32,7 @@ use std::any::type_name; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::iter::repeat; +use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context}; @@ -42,17 +43,20 @@ use futures::{Stream, StreamExt}; use moka::future::Cache; use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; -use risingwave_pb::common::WorkerNode; +use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::heartbeat_request::extra_info; use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; pub mod error; + use error::Result; + mod compactor_client; mod compute_client; mod connector_client; +mod frontend_client; mod hummock_meta_client; mod meta_client; mod sink_coordinate_client; @@ -62,6 +66,7 @@ mod tracing; pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; +pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; use rw_futures_util::await_future_with_monitor_error_stream; @@ -127,7 +132,16 @@ where /// Gets the RPC client for the given node. If the connection is not established, a /// new client will be created and returned. pub async fn get(&self, node: &WorkerNode) -> Result { - let addr: HostAddr = node.get_host().unwrap().into(); + let addr = if node.get_type().unwrap() == WorkerType::Frontend { + let prop = node + .property + .as_ref() + .expect("frontend node property is missing"); + HostAddr::from_str(prop.internal_rpc_host_addr.as_str())? + } else { + node.get_host().unwrap().into() + }; + self.get_by_addr(addr).await } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 5a45e0752c9d8..b18acb55aac69 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -245,7 +245,7 @@ impl MetaClient { .add_worker_node(AddWorkerNodeRequest { worker_type: worker_type as i32, host: Some(addr.to_protobuf()), - property: Some(property), + property: Some(property.clone()), resource: Some(risingwave_pb::common::worker_node::Resource { rw_version: RW_VERSION.to_string(), total_memory_bytes: system_memory_available_bytes() as _, @@ -559,6 +559,14 @@ impl MetaClient { Ok(resp.version) } + pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> { + let request = AutoSchemaChangeRequest { + schema_change: Some(schema_change), + }; + let _ = self.inner.auto_schema_change(request).await?; + Ok(()) + } + pub async fn create_view(&self, view: PbView) -> Result { let request = CreateViewRequest { view: Some(view) }; let resp = self.inner.create_view(request).await?; @@ -2030,6 +2038,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse } ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse } ,{ ddl_client, wait, WaitRequest, WaitResponse } + ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index 36598a9ec9e47..baf5c4070c74a 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -553,6 +553,7 @@ mod tests { incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, + cdc_table_id: None, } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 1c73a3aeddad6..852514cb16e1c 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -21,6 +21,7 @@ use await_tree::InstrumentAwait; use futures::future::join_all; use hytra::TrAdder; use risingwave_common::catalog::TableId; +use risingwave_common::config::StreamingConfig; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::util::epoch::EpochPair; @@ -28,6 +29,7 @@ use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID}; use risingwave_expr::ExprError; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::PbStreamActor; +use risingwave_rpc_client::MetaClient; use thiserror_ext::AsReport; use tokio_stream::StreamExt; use tracing::Instrument; @@ -55,6 +57,11 @@ pub struct ActorContext { pub initial_dispatch_num: usize, // mv_table_id to subscription id pub related_subscriptions: HashMap>, + + // Meta client. currently used for auto schema change. `None` for test only + pub meta_client: Option, + + pub streaming_config: Arc, } pub type ActorContextRef = Arc; @@ -72,6 +79,8 @@ impl ActorContext { // Set 1 for test to enable sanity check on table initial_dispatch_num: 1, related_subscriptions: HashMap::new(), + meta_client: None, + streaming_config: Arc::new(StreamingConfig::default()), }) } @@ -81,6 +90,8 @@ impl ActorContext { streaming_metrics: Arc, initial_dispatch_num: usize, related_subscriptions: HashMap>, + meta_client: Option, + streaming_config: Arc, ) -> ActorContextRef { Arc::new(Self { id: stream_actor.actor_id, @@ -92,6 +103,8 @@ impl ActorContext { streaming_metrics, initial_dispatch_num, related_subscriptions, + meta_client, + streaming_config, }) } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 788a9a45662cd..13580ca49e001 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -177,6 +177,7 @@ impl FsFetchExecutor { rate_limit: self.rate_limit_rps, }, source_desc.source.config.clone(), + None, ) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 32d5d533d904a..6754570c4930b 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -108,6 +108,7 @@ impl FsSourceExecutor { rate_limit: self.rate_limit_rps, }, source_desc.source.config.clone(), + None, ); let stream = source_desc .source diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 4a34eabe97e16..8351e48023a05 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -216,6 +216,7 @@ impl SourceBackfillExecutorInner { rate_limit: self.rate_limit_rps, }, source_desc.source.config.clone(), + None, ); let stream = source_desc .source diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index c550c0e12f030..6c951a8a28110 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -24,6 +24,7 @@ use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::{Epoch, EpochPair}; +use risingwave_connector::parser::schema_change::SchemaChangeEnvelope; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ @@ -32,8 +33,8 @@ use risingwave_connector::source::{ }; use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; -use tokio::sync::mpsc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; use super::executor_core::StreamSourceCore; @@ -122,6 +123,43 @@ impl SourceExecutor { .iter() .map(|column_desc| column_desc.column_id) .collect_vec(); + + let (schema_change_tx, mut schema_change_rx) = + tokio::sync::mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); + let schema_change_tx = if self.is_auto_schema_change_enable() { + let meta_client = self.actor_ctx.meta_client.clone(); + // spawn a task to handle schema change event from source parser + let _join_handle = tokio::task::spawn(async move { + while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await { + let table_names = schema_change.table_names(); + tracing::info!("recv a schema change event for tables: {:?}", table_names); + // TODO: retry on rpc error + if let Some(ref meta_client) = meta_client { + match meta_client + .auto_schema_change(schema_change.to_protobuf()) + .await + { + Ok(_) => { + tracing::info!( + "schema change success for tables: {:?}", + table_names + ); + finish_tx.send(()).unwrap(); + } + Err(e) => { + tracing::error!(error = ?e.as_report(), "schema change error"); + finish_tx.send(()).unwrap(); + } + } + } + } + }); + Some(schema_change_tx) + } else { + info!("auto schema change is disabled in config"); + None + }; + let source_ctx = SourceContext::new( self.actor_ctx.id, self.stream_source_core.as_ref().unwrap().source_id, @@ -137,6 +175,7 @@ impl SourceExecutor { rate_limit: self.rate_limit_rps, }, source_desc.source.config.clone(), + schema_change_tx, ); let stream = source_desc .source @@ -147,6 +186,13 @@ impl SourceExecutor { Ok(apply_rate_limit(stream?, self.rate_limit_rps).boxed()) } + fn is_auto_schema_change_enable(&self) -> bool { + self.actor_ctx + .streaming_config + .developer + .enable_auto_schema_change + } + /// `source_id | source_name | actor_id | fragment_id` #[inline] fn get_metric_labels(&self) -> [String; 4] { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a5b2d48d57c4..dac104156f158 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -577,6 +577,7 @@ impl StreamActorManager { } = actor; let actor = actor.unwrap(); let actor_id = actor.actor_id; + let streaming_config = self.env.config().clone(); let actor_context = ActorContext::create( &actor, self.env.total_mem_usage(), @@ -591,6 +592,8 @@ impl StreamActorManager { ) }) .collect(), + self.env.meta_client().clone(), + streaming_config, ); let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into()); let expr_context = actor.expr_context.clone().unwrap(); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 2d272823d6058..13df85bf25d97 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -158,6 +158,7 @@ async fn compaction_test( incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, + cdc_table_id: None, }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2; diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index b1c533b21cab3..c89489bf559a8 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -151,8 +151,16 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { Ok(()) } +fn init_logger() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(false) + .try_init(); +} + #[tokio::test] async fn test_sink_scale() -> Result<()> { + init_logger(); scale_test_inner(false).await } diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 4b0b8657ac59f..99cbd58bc20aa 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -47,6 +47,14 @@ pub type SessionId = (ProcessId, SecretKey); pub trait SessionManager: Send + Sync + 'static { type Session: Session; + /// In the process of auto schema change, we need a dummy session to access + /// catalog information in frontend and build a replace plan for the table. + fn create_dummy_session( + &self, + database_id: u32, + user_id: u32, + ) -> Result, BoxedError>; + fn connect( &self, database: &str, @@ -257,7 +265,7 @@ impl UserAuthenticator { /// Returns when the `shutdown` token is triggered. pub async fn pg_serve( addr: &str, - session_mgr: impl SessionManager, + session_mgr: Arc, tls_config: Option, redact_sql_option_keywords: Option, shutdown: CancellationToken, @@ -280,7 +288,6 @@ pub async fn pg_serve( #[cfg(madsim)] let worker_runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - let session_mgr = Arc::new(session_mgr); let session_mgr_clone = session_mgr.clone(); let f = async move { loop { @@ -380,6 +387,14 @@ mod tests { impl SessionManager for MockSessionManager { type Session = MockSession; + fn create_dummy_session( + &self, + _database_id: u32, + _user_name: u32, + ) -> Result, BoxedError> { + unimplemented!() + } + fn connect( &self, _database: &str, @@ -519,7 +534,7 @@ mod tests { tokio::spawn(async move { pg_serve( &bind_addr, - session_mgr, + Arc::new(session_mgr), None, None, CancellationToken::new(), // dummy