diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 57aedc0e1490f..3e32b0261fbab 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -85,6 +85,7 @@ pub async fn handle_alter_parallelism( session.check_privilege_for_drop_alter(schema_name, &**sink)?; sink.id.sink_id() } + // TODO: support alter parallelism for shared source _ => bail!( "invalid statement type for alter parallelism: {:?}", stmt_type diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index d7b90a03172d3..dda8bda4675d1 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,6 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::max_column_id; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ @@ -59,11 +60,15 @@ pub async fn handle_alter_source_column( }; if catalog.associated_table_id.is_some() { - Err(ErrorCode::NotSupported( + return Err(ErrorCode::NotSupported( "alter table with connector with ALTER SOURCE statement".to_string(), "try to use ALTER TABLE instead".to_string(), - ))? + ) + .into()); }; + if catalog.info.is_shared() { + bail_not_implemented!(issue = 123, "alter shared source"); + } // Currently only allow source without schema registry let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 25de847fca3f9..7d82a89a64e08 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -228,6 +228,9 @@ pub async fn handle_alter_source_with_sr( ) .into()); }; + if source.info.is_shared() { + bail_not_implemented!(issue = 123, "alter shared source"); + } check_format_encode(&source, &connector_schema)?; diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index 39c7dc556cf3e..af006b06e8bb5 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -109,3 +109,10 @@ impl From for ActiveModel { } } } + +impl Model { + pub fn is_shared(&self) -> bool { + self.source_info + .is_some_and(|s| s.to_protobuf().is_shared()) + } +} diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 273c45b8d2aa9..55342a0b3bf32 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -122,6 +122,7 @@ impl StreamManagerService for StreamServiceImpl { } }; + // TODO: check whether shared source is correct let mutation: ThrottleConfig = actor_to_apply .iter() .map(|(fragment_id, actors)| { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 23a6a9d2f32dc..c694290c9b69d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1670,6 +1670,16 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into())); + + // Note: For non-shared source, we don't update their state tables, which + // belongs to the MV. + if source.is_shared() { + update_internal_tables( + object_id, + object::Column::OwnerId, + Value::Int(Some(new_owner)), + ) + } } ObjectType::Sink => { let sink = Sink::find_by_id(object_id) @@ -1678,34 +1688,11 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?; relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into())); - // internal tables. - let internal_tables: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(object_id)) - .into_tuple() - .all(&txn) - .await?; - - Object::update_many() - .col_expr( - object::Column::OwnerId, - SimpleExpr::Value(Value::Int(Some(new_owner))), - ) - .filter(object::Column::Oid.is_in(internal_tables.clone())) - .exec(&txn) - .await?; - - let table_objs = Table::find() - .find_also_related(Object) - .filter(table::Column::TableId.is_in(internal_tables)) - .all(&txn) - .await?; - for (table, table_obj) in table_objs { - relations.push(PbRelationInfo::Table( - ObjectModel(table, table_obj.unwrap()).into(), - )); - } + update_internal_tables( + object_id, + object::Column::OwnerId, + Value::Int(Some(new_owner)), + ) } ObjectType::Subscription => { let subscription = Subscription::find_by_id(object_id) @@ -1888,6 +1875,16 @@ impl CatalogController { obj.schema_id = Set(Some(new_schema)); let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into())); + + // Note: For non-shared source, we don't update their state tables, which + // belongs to the MV. + if source.is_shared() { + update_internal_tables( + object_id, + object::Column::SchemaId, + Value::Int(Some(new_schema)), + )?; + } } ObjectType::Sink => { let sink = Sink::find_by_id(object_id) @@ -1901,36 +1898,11 @@ impl CatalogController { let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into())); - // internal tables. - let internal_tables: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(object_id)) - .into_tuple() - .all(&txn) - .await?; - - if !internal_tables.is_empty() { - Object::update_many() - .col_expr( - object::Column::SchemaId, - SimpleExpr::Value(Value::Int(Some(new_schema))), - ) - .filter(object::Column::Oid.is_in(internal_tables.clone())) - .exec(&txn) - .await?; - - let table_objs = Table::find() - .find_also_related(Object) - .filter(table::Column::TableId.is_in(internal_tables)) - .all(&txn) - .await?; - for (table, table_obj) in table_objs { - relations.push(PbRelationInfo::Table( - ObjectModel(table, table_obj.unwrap()).into(), - )); - } - } + update_internal_tables( + object_id, + object::Column::SchemaId, + Value::Int(Some(new_schema)), + )?; } ObjectType::Subscription => { let subscription = Subscription::find_by_id(object_id) @@ -2452,6 +2424,7 @@ impl CatalogController { }}; } + // TODO: check is there any thing to change for shared source? let old_name = match object_type { ObjectType::Table => rename_relation!(Table, table, table_id, object_id), ObjectType::Source => rename_relation!(Source, source, source_id, object_id), @@ -3395,6 +3368,39 @@ impl CatalogControllerInner { } } +async fn update_internal_tables( + object_id: i32, + column: object::Column, + new_value: Value, +) -> MetaResult<()> { + let internal_tables: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter(table::Column::BelongsToJobId.eq(object_id)) + .into_tuple() + .all(&txn) + .await?; + + if !internal_tables.is_empty() { + Object::update_many() + .col_expr(column, SimpleExpr::Value(new_value)) + .filter(object::Column::Oid.is_in(internal_tables.clone())) + .exec(&txn) + .await?; + + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::TableId.is_in(internal_tables)) + .all(&txn) + .await?; + for (table, table_obj) in table_objs { + relations.push(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )); + } + } +} + #[cfg(test)] #[cfg(not(madsim))] mod tests {