diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index a312294896a13..da05f65659cde 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -116,7 +116,7 @@ message BarrierMutation { // Stop a set of actors, used for dropping materialized views. Empty dispatchers will be // automatically removed. StopMutation stop = 4; - // Update outputs and hash mappings for some dispatchers, used for scaling. + // Update outputs and hash mappings for some dispatchers, used for scaling and replace table. UpdateMutation update = 5; // Change the split of some sources. SourceChangeSplitMutation splits = 6; @@ -124,11 +124,12 @@ message BarrierMutation { PauseMutation pause = 7; // Resume the dataflow of the whole streaming graph, only used for scaling. ResumeMutation resume = 8; - // Throttle specific source exec or chain exec. + // Throttle specific source exec or backfill exec. ThrottleMutation throttle = 10; // Drop subscription on mv DropSubscriptionsMutation drop_subscriptions = 12; // Combined mutation. + // Currently, it can only be Add & Update, which is for sink into table. CombinedMutation combined = 100; } } diff --git a/src/meta/model/src/prelude.rs b/src/meta/model/src/prelude.rs index b17eae112aef0..86f1538a11abb 100644 --- a/src/meta/model/src/prelude.rs +++ b/src/meta/model/src/prelude.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub use {Source as SourceModel, Table as TableModel}; + pub use super::actor::Entity as Actor; pub use super::actor_dispatcher::Entity as ActorDispatcher; pub use super::catalog_version::Entity as CatalogVersion; diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index ac7cf70642058..0bb5a42aa6959 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -247,7 +247,7 @@ impl DdlService for DdlServiceImpl { None => { let version = self .ddl_controller - .run_command(DdlCommand::CreateSourceWithoutStreamingJob(source)) + .run_command(DdlCommand::CreateNonSharedSource(source)) .await?; Ok(Response::new(CreateSourceResponse { status: None, @@ -699,10 +699,10 @@ impl DdlService for DdlServiceImpl { &self, request: Request, ) -> Result, Status> { - let AlterSourceRequest { source } = request.into_inner(); + let AlterSourceRequest { source, plan } = request.into_inner(); let version = self .ddl_controller - .run_command(DdlCommand::AlterSourceColumn(source.unwrap())) + .run_command(DdlCommand::AlterNonSharedSource(source.unwrap(), plan)) .await?; Ok(Response::new(AlterSourceResponse { status: None, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 8e1f20eb63e08..d315587546c32 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -213,8 +213,8 @@ pub enum CreateStreamingJobType { } /// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands, -/// it will build different barriers to send, and may do different stuffs after the barrier is -/// collected. +/// it will [build different barriers to send](Self::to_mutation), +/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect). #[derive(Debug, strum::Display)] pub enum Command { /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed @@ -595,6 +595,7 @@ impl Command { Command::Pause(_) => { // Only pause when the cluster is not already paused. + // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)?? if current_paused_reason.is_none() { Some(Mutation::Pause(PauseMutation {})) } else { @@ -699,7 +700,6 @@ impl Command { .. }) = job_type { - // TODO: support in v2. let update = Self::generate_update_mutation_for_replace_table( old_fragments, merge_updates, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 3a788438a0c14..1795e39d18476 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2730,7 +2730,10 @@ impl CatalogController { .collect()) } - pub async fn alter_source(&self, pb_source: PbSource) -> MetaResult { + pub async fn alter_non_shared_source( + &self, + pb_source: PbSource, + ) -> MetaResult { let source_id = pb_source.id as SourceId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index b8e12afdf982b..73ff00d4c506e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -24,7 +24,7 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; -use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; +use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, @@ -46,7 +46,7 @@ use risingwave_pb::meta::{ use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, + DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -1346,29 +1346,41 @@ impl CatalogController { Ok(actors) } - pub async fn get_upstream_root_fragments( + /// Get and filter the "**root**" fragments of the specified jobs. + /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`. + /// + /// Root fragment connects to downstream jobs. + /// + /// ## What can be the root fragment + /// - For MV, it should have one `MView` fragment. + /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root. + /// - For source, it should have one `Source` fragment. + /// + /// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment. + pub async fn get_root_fragments( &self, - upstream_job_ids: Vec, + job_ids: Vec, ) -> MetaResult<(HashMap, Vec<(ActorId, WorkerId)>)> { let inner = self.inner.read().await; let all_upstream_fragments = Fragment::find() - .filter(fragment::Column::JobId.is_in(upstream_job_ids)) + .filter(fragment::Column::JobId.is_in(job_ids)) .all(&inner.db) .await?; // job_id -> fragment - let mut fragments = HashMap::::new(); + let mut root_fragments = HashMap::::new(); for fragment in all_upstream_fragments { if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 { - _ = fragments.insert(fragment.job_id, fragment); + _ = root_fragments.insert(fragment.job_id, fragment); } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { - // look for Source fragment if there's no MView fragment - _ = fragments.try_insert(fragment.job_id, fragment); + // look for Source fragment only if there's no MView fragment + // (notice try_insert here vs insert above) + _ = root_fragments.try_insert(fragment.job_id, fragment); } } - let mut root_fragments = HashMap::new(); - for (_, fragment) in fragments { + let mut root_fragments_pb = HashMap::new(); + for (_, fragment) in root_fragments { let actors = fragment.find_related(Actor).all(&inner.db).await?; let actor_dispatchers = get_actor_dispatchers( &inner.db, @@ -1376,7 +1388,7 @@ impl CatalogController { ) .await?; - root_fragments.insert( + root_fragments_pb.insert( fragment.job_id, Self::compose_fragment(fragment, actors, actor_dispatchers)?.0, ); @@ -1389,35 +1401,34 @@ impl CatalogController { .all(&inner.db) .await?; - Ok((root_fragments, actors)) + Ok((root_fragments_pb, actors)) } - /// Get the downstream `Chain` fragments of the specified table. - pub async fn get_downstream_chain_fragments( + pub async fn get_root_fragment( + &self, + job_id: ObjectId, + ) -> MetaResult<(PbFragment, Vec<(ActorId, WorkerId)>)> { + let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?; + let root_fragment = root_fragments + .remove(&job_id) + .context(format!("root fragment for job {} not found", job_id))?; + Ok((root_fragment, actors)) + } + + /// Get the downstream fragments connected to the specified job. + pub async fn get_downstream_fragments( &self, job_id: ObjectId, ) -> MetaResult<( Vec<(DispatchStrategy, PbFragment)>, Vec<(ActorId, WorkerId)>, )> { - let mview_fragment = self.get_mview_fragment(job_id).await?; - let downstream_dispatches: HashMap<_, _> = mview_fragment.actors[0] - .dispatcher - .iter() - .map(|d| { - let fragment_id = d.dispatcher_id as FragmentId; - let strategy = PbDispatchStrategy { - r#type: d.r#type, - dist_key_indices: d.dist_key_indices.clone(), - output_indices: d.output_indices.clone(), - }; - (fragment_id, strategy) - }) - .collect(); + let (root_fragment, actors) = self.get_root_fragment(job_id).await?; + let dispatches = root_fragment.dispatches(); let inner = self.inner.read().await; - let mut chain_fragments = vec![]; - for (fragment_id, dispatch_strategy) in downstream_dispatches { + let mut downstream_fragments = vec![]; + for (fragment_id, dispatch_strategy) in dispatches { let mut fragment_actors = Fragment::find_by_id(fragment_id) .find_with_related(Actor) .all(&inner.db) @@ -1433,17 +1444,10 @@ impl CatalogController { ) .await?; let fragment = Self::compose_fragment(fragment, actors, actor_dispatchers)?.0; - chain_fragments.push((dispatch_strategy, fragment)); + downstream_fragments.push((dispatch_strategy, fragment)); } - let actors: Vec<(ActorId, WorkerId)> = Actor::find() - .select_only() - .columns([actor::Column::ActorId, actor::Column::WorkerId]) - .into_tuple() - .all(&inner.db) - .await?; - - Ok((chain_fragments, actors)) + Ok((downstream_fragments, actors)) } pub async fn load_source_fragment_ids( @@ -1516,35 +1520,6 @@ impl CatalogController { Ok(splits.into_iter().collect()) } - /// Get the `Materialize` fragment of the specified table. - pub async fn get_mview_fragment(&self, job_id: ObjectId) -> MetaResult { - let inner = self.inner.read().await; - let mut fragments = Fragment::find() - .filter(fragment::Column::JobId.eq(job_id)) - .all(&inner.db) - .await?; - fragments.retain(|f| f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0); - if fragments.is_empty() { - bail!("No mview fragment found for job {}", job_id); - } - assert_eq!(fragments.len(), 1); - - let fragment = fragments.pop().unwrap(); - let actor_with_dispatchers = fragment - .find_related(Actor) - .find_with_related(ActorDispatcher) - .all(&inner.db) - .await?; - let mut actors = vec![]; - let mut actor_dispatchers = HashMap::new(); - for (actor, dispatchers) in actor_with_dispatchers { - actor_dispatchers.insert(actor.actor_id, dispatchers); - actors.push(actor); - } - - Ok(Self::compose_fragment(fragment, actors, actor_dispatchers)?.0) - } - /// Get the actor count of `Materialize` or `Sink` fragment of the specified table. pub async fn get_actual_job_fragment_parallelism( &self, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index eb8f79f80e642..8955f9aa127b0 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -416,6 +416,7 @@ impl CatalogController { // TODO: In this function, we also update the `Table` model in the meta store. // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider // making them the source of truth and performing a full replacement for those in the meta store? + /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs. pub async fn prepare_streaming_job( &self, stream_job_fragments: &StreamJobFragments, @@ -481,7 +482,7 @@ impl CatalogController { } if !for_replace { - // // Update dml fragment id. + // Update dml fragment id. if let StreamingJob::Table(_, table, ..) = streaming_job { Table::update(table::ActiveModel { table_id: Set(table.id as _), @@ -1008,7 +1009,7 @@ impl CatalogController { Ok(version) } - /// TODO: make it general for other streaming jobs. + /// TODO(alter-source): make it general for other streaming jobs. /// Currently only for replacing table. pub async fn finish_replace_streaming_job_inner( tmp_id: ObjectId, @@ -1025,6 +1026,7 @@ impl CatalogController { let original_job_id = streaming_job.id() as ObjectId; let job_type = streaming_job.job_type(); + // Update catalog match streaming_job { StreamingJob::Table(_source, table, _table_job_type) => { // The source catalog should remain unchanged @@ -1063,7 +1065,11 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); table.update(txn).await?; } - // TODO: support other streaming jobs + StreamingJob::Source(source) => { + // Update the source catalog with the new one. + let mut source = source::ActiveModel::from(source); + source.update(txn).await?; + } _ => unreachable!( "invalid streaming job type: {:?}", streaming_job.job_type_str() @@ -1221,8 +1227,21 @@ impl CatalogController { )), }) } - _ => unreachable!("invalid streaming job type: {:?}", job_type), + StreamingJobType::Source => { + let (source, source_obj) = Source::find_by_id(original_job_id) + .find_also_related(Object) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Source( + ObjectModel(source, source_obj.unwrap()).into(), + )), + }) + } + _ => unreachable!("invalid streaming job type for replace: {:?}", job_type), } + // TODO(alter-source) what is this if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 25725909bace7..0cb59129b0520 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -426,7 +426,7 @@ impl MetadataManager { ) -> MetaResult<(HashMap, HashMap)> { let (upstream_root_fragments, actors) = self .catalog_controller - .get_upstream_root_fragments( + .get_root_fragments( upstream_table_ids .iter() .map(|id| id.table_id as _) @@ -496,7 +496,7 @@ impl MetadataManager { .await } - pub async fn get_downstream_chain_fragments( + pub async fn get_downstream_fragments( &self, job_id: u32, ) -> MetaResult<( @@ -505,7 +505,7 @@ impl MetadataManager { )> { let (fragments, actors) = self .catalog_controller - .get_downstream_chain_fragments(job_id as _) + .get_downstream_fragments(job_id as _) .await?; let actors = actors diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 3dbf1fc7bf053..b91c3f5e9bd01 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -18,8 +18,8 @@ use risingwave_common::catalog::TableVersionId; use risingwave_common::util::epoch::Epoch; use risingwave_common::{bail_not_implemented, current_cluster_version}; use risingwave_meta_model::object::ObjectType; -use risingwave_meta_model::prelude::Table as TableModel; -use risingwave_meta_model::{table, TableId, TableVersion}; +use risingwave_meta_model::prelude::{SourceModel, TableModel}; +use risingwave_meta_model::{source, table, SourceId, TableId, TableVersion}; use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; use sea_orm::entity::prelude::*; @@ -30,7 +30,7 @@ use super::{ get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source, get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source, }; -use crate::model::FragmentId; +use crate::stream::StreamFragmentGraph; use crate::{MetaError, MetaResult}; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and @@ -162,16 +162,6 @@ impl StreamingJob { } } - /// Set the fragment id where the table is materialized. - pub fn set_table_fragment_id(&mut self, id: FragmentId) { - match self { - Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { - table.fragment_id = id; - } - Self::Sink(_, _) | Self::Source(_) => {} - } - } - /// Set the vnode count of the table. pub fn set_table_vnode_count(&mut self, vnode_count: usize) { match self { @@ -182,14 +172,17 @@ impl StreamingJob { } } - /// Set the fragment id where the table dml is received. - pub fn set_dml_fragment_id(&mut self, id: Option) { + /// Add some info which is only available in fragment graph to the catalog. + pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) { match self { Self::Table(_, table, ..) => { - table.dml_fragment_id = id; + table.fragment_id = graph.table_fragment_id(); + table.dml_fragment_id = graph.dml_fragment_id(); } - Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {} - Self::Source(_) => {} + Self::MaterializedView(table) | Self::Index(_, table) => { + table.fragment_id = graph.table_fragment_id(); + } + Self::Sink(_, _) | Self::Source(_) => {} } } @@ -388,10 +381,23 @@ impl StreamingJob { return Err(MetaError::permission_denied("table version is stale")); } } + StreamingJob::Source(source) => { + let new_version = source.get_version(); + let original_version: Option = SourceModel::find_by_id(id as SourceId) + .select_only() + .column(source::Column::Version) + .into_tuple() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?; + let original_version = original_version.expect("version for source should exist"); + if new_version != original_version as u64 + 1 { + return Err(MetaError::permission_denied("source version is stale")); + } + } StreamingJob::MaterializedView(_) | StreamingJob::Sink(_, _) - | StreamingJob::Index(_, _) - | StreamingJob::Source(_) => { + | StreamingJob::Index(_, _) => { bail_not_implemented!("schema change for {}", self.job_type_str()) } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 12c06325fd99e..dd5aa72a16c32 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -18,7 +18,7 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, WorkerSlotId}; -use risingwave_common::util::stream_graph_visitor; +use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model::{SourceId, WorkerId}; use risingwave_pb::catalog::Table; @@ -450,6 +450,36 @@ impl StreamJobFragments { Ok(source_backfill_fragments) } + /// Find the table job's `Union` fragment. + /// Panics if not found. + pub fn union_fragment_for_table(&mut self) -> &mut Fragment { + let mut union_fragment_id = None; + for (fragment_id, fragment) in &mut self.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Union(_) = body { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + } + }) + }; + } + } + + let union_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + let union_fragment = self + .fragments + .get_mut(&union_fragment_id) + .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id)); + union_fragment + } + /// Resolve dependent table fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap) { let table_id = match stream_node.node_body.as_ref() { @@ -601,13 +631,6 @@ impl StreamJobFragments { }); self } - - /// Panics if the fragment is not found. - pub fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut Fragment { - self.fragments - .get_mut(&fragment_id) - .unwrap_or_else(|| panic!("fragment {} not found", fragment_id)) - } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e56598839bf25..6525a0bc20829 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -29,7 +29,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont_mut, }; -use risingwave_common::{bail, hash, must_match}; +use risingwave_common::{bail, bail_not_implemented, hash, must_match}; use risingwave_connector::connector_common::validate_connection; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::{ @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, - TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -129,7 +129,7 @@ pub enum DdlCommand { DropDatabase(DatabaseId), CreateSchema(Schema), DropSchema(SchemaId), - CreateSourceWithoutStreamingJob(Source), + CreateNonSharedSource(Source), DropSource(SourceId, DropMode), CreateFunction(Function), DropFunction(FunctionId), @@ -146,7 +146,7 @@ pub enum DdlCommand { AlterName(alter_name_request::Object, String), AlterSwapRename(alter_swap_rename_request::Object), ReplaceTable(ReplaceStreamJobInfo), - AlterSourceColumn(Source), + AlterNonSharedSource(Source, Option), AlterObjectOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), CreateConnection(Connection), @@ -184,9 +184,9 @@ impl DdlCommand { | DdlCommand::AlterSecret(_) | DdlCommand::AlterSwapRename(_) => true, DdlCommand::CreateStreamingJob(_, _, _, _, _) - | DdlCommand::CreateSourceWithoutStreamingJob(_) + | DdlCommand::CreateNonSharedSource(_) | DdlCommand::ReplaceTable(_) - | DdlCommand::AlterSourceColumn(_) + | DdlCommand::AlterNonSharedSource(_, _) | DdlCommand::CreateSubscription(_) => false, } } @@ -299,8 +299,8 @@ impl DdlController { DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await, DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await, DdlCommand::DropSchema(schema_id) => ctrl.drop_schema(schema_id).await, - DdlCommand::CreateSourceWithoutStreamingJob(source) => { - ctrl.create_source_without_streaming_job(source).await + DdlCommand::CreateNonSharedSource(source) => { + ctrl.create_non_shared_source(source).await } DdlCommand::DropSource(source_id, drop_mode) => { ctrl.drop_source(source_id, drop_mode).await @@ -335,7 +335,7 @@ impl DdlController { fragment_graph, col_index_mapping, }) => { - ctrl.replace_table(streaming_job, fragment_graph, col_index_mapping) + ctrl.replace_job(streaming_job, fragment_graph, col_index_mapping) .await } DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await, @@ -354,7 +354,9 @@ impl DdlController { DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await, - DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await, + DdlCommand::AlterNonSharedSource(source, plan) => { + ctrl.alter_non_shared_source(source, plan).await + } DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { ctrl.create_subscription(subscription).await @@ -437,10 +439,7 @@ impl DdlController { } /// Shared source is handled in [`Self::create_streaming_job`] - async fn create_source_without_streaming_job( - &self, - source: Source, - ) -> MetaResult { + async fn create_non_shared_source(&self, source: Source) -> MetaResult { let handle = create_source_worker_handle(&source, self.source_manager.metrics.clone()) .await .context("failed to create source worker")?; @@ -467,11 +466,15 @@ impl DdlController { /// This replaces the source in the catalog. /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. - async fn alter_source(&self, source: Source) -> MetaResult { + async fn alter_non_shared_source( + &self, + source: Source, + plan: Option, + ) -> MetaResult { self.metadata_manager .catalog_controller - .alter_source(source) - .await + .alter_non_shared_source(source) + .await? } async fn create_function(&self, function: Function) -> MetaResult { @@ -693,33 +696,11 @@ impl DdlController { fragment_graph: StreamFragmentGraph, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { let (mut replace_table_ctx, mut stream_job_fragments) = self - .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) + .build_replace_job(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) .await?; - let mut union_fragment_id = None; - - for (fragment_id, fragment) in &mut stream_job_fragments.fragments { - for actor in &mut fragment.actors { - if let Some(node) = &mut actor.nodes { - visit_stream_node(node, |body| { - if let NodeBody::Union(_) = body { - if let Some(union_fragment_id) = union_fragment_id.as_mut() { - // The union fragment should be unique. - assert_eq!(*union_fragment_id, *fragment_id); - } else { - union_fragment_id = Some(*fragment_id); - } - } - }) - }; - } - } - let target_table = streaming_job.table().unwrap(); - let union_fragment_id = - union_fragment_id.expect("fragment of placeholder merger not found"); - if let Some(creating_sink_table_fragments) = creating_sink_table_fragments { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); let sink = sink.expect("sink not found"); @@ -728,7 +709,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - stream_job_fragments.fragment_mut(union_fragment_id), + stream_job_fragments.union_fragment_for_table(), None, ); } @@ -766,7 +747,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - stream_job_fragments.fragment_mut(union_fragment_id), + stream_job_fragments.union_fragment_for_table(), Some(&sink.unique_identity()), ); } @@ -1015,8 +996,7 @@ impl DdlController { ) -> MetaResult { let mut fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + streaming_job.set_info_from_graph(&fragment_graph); // create internal table catalogs and refill table id. let incomplete_internal_tables = fragment_graph @@ -1050,8 +1030,7 @@ impl DdlController { let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + streaming_job.set_info_from_graph(&fragment_graph); let streaming_job = streaming_job; Some((streaming_job, fragment_graph)) @@ -1124,6 +1103,7 @@ impl DdlController { } } + /// `target_replace_info`: when dropping a sink into table, we need to replace the table. pub async fn drop_object( &self, object_type: ObjectType, @@ -1196,8 +1176,7 @@ impl DdlController { let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + streaming_job.set_info_from_graph(&fragment_graph); let streaming_job = streaming_job; streaming_job.table().expect("should be table job"); @@ -1321,13 +1300,23 @@ impl DdlController { Ok(version) } - /// This is used for `ALTER TABLE ADD/DROP COLUMN`. - pub async fn replace_table( + /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`. + pub async fn replace_job( &self, mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, + // TODO(alter-source): what is this table_col_index_mapping: Option, ) -> MetaResult { + match &mut streaming_job { + StreamingJob::Table(..) | StreamingJob::Source(..) => {} + StreamingJob::MaterializedView(..) + | StreamingJob::Sink(..) + | StreamingJob::Index(..) => { + bail_not_implemented!("schema change for {}", streaming_job.job_type_str()) + } + } + let job_id = streaming_job.id(); let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; @@ -1345,13 +1334,11 @@ impl DdlController { // 1. build fragment graph. let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + streaming_job.set_info_from_graph(&fragment_graph); + + // make it immutable let streaming_job = streaming_job; - let StreamingJob::Table(_, table, ..) = &streaming_job else { - unreachable!("unexpected job: {streaming_job:?}") - }; let tmp_id = self .metadata_manager .catalog_controller @@ -1368,7 +1355,7 @@ impl DdlController { let result: MetaResult> = try { let (mut ctx, mut stream_job_fragments) = self - .build_replace_table( + .build_replace_job( ctx, &streaming_job, fragment_graph, @@ -1377,55 +1364,36 @@ impl DdlController { ) .await?; - let mut union_fragment_id = None; - - for (fragment_id, fragment) in &mut stream_job_fragments.fragments { - for actor in &mut fragment.actors { - if let Some(node) = &mut actor.nodes { - visit_stream_node(node, |body| { - if let NodeBody::Union(_) = body { - if let Some(union_fragment_id) = union_fragment_id.as_mut() { - // The union fragment should be unique. - assert_eq!(*union_fragment_id, *fragment_id); - } else { - union_fragment_id = Some(*fragment_id); - } - } - }) - }; - } - } - - let union_fragment_id = - union_fragment_id.expect("fragment of placeholder merger not found"); - let union_fragment = stream_job_fragments.fragment_mut(union_fragment_id); - - let catalogs = self - .metadata_manager - .get_sink_catalog_by_ids(&table.incoming_sinks) - .await?; - - for sink in catalogs { - let sink_id = &sink.id; - - let sink_table_fragments = self + if let StreamingJob::Table(_, table, ..) = &streaming_job { + let catalogs = self .metadata_manager - .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .get_sink_catalog_by_ids(&table.incoming_sinks) .await?; - let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + for sink in catalogs { + let sink_id = &sink.id; - Self::inject_replace_table_plan_for_sink( - Some(*sink_id), - &sink_fragment, - table, - &mut ctx, - union_fragment, - Some(&sink.unique_identity()), - ); + let sink_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new( + *sink_id, + )) + .await?; + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); - if sink.original_target_columns.is_empty() { - updated_sink_catalogs.push(sink.id as _); + Self::inject_replace_table_plan_for_sink( + Some(*sink_id), + &sink_fragment, + table, + &mut ctx, + stream_job_fragments.union_fragment_for_table(), + Some(&sink.unique_identity()), + ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id as _); + } } } @@ -1462,12 +1430,12 @@ impl DdlController { Ok(version) } Err(err) => { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); + tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job"); let _ = self.metadata_manager .catalog_controller .try_abort_replacing_streaming_job(tmp_id) .await.inspect_err(|err| { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); + tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job"); }); Err(err) } @@ -1719,18 +1687,19 @@ impl DdlController { Ok((ctx, stream_job_fragments)) } - /// `build_replace_table` builds a table replacement and returns the context and new table + /// `build_replace_table` builds a job replacement and returns the context and new job /// fragments. /// - /// Note that we use a dummy ID for the new table fragments and replace it with the real one after + /// Note that we use a dummy ID for the new job fragments and replace it with the real one after /// replacement is finished. - pub(crate) async fn build_replace_table( + pub(crate) async fn build_replace_job( &self, stream_ctx: StreamContext, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, + // TODO(alter-source): check what does this mean table_col_index_mapping: Option, - tmp_table_id: TableId, + tmp_job_id: TableId, ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { let id = stream_job.id(); let expr_context = stream_ctx.to_expr_context(); @@ -1749,50 +1718,47 @@ impl DdlController { // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete // graph that contains all information needed for building the actor graph. - let original_table_fragment = old_fragments + let original_mview_fragment = old_fragments .mview_fragment() .expect("mview fragment not found"); let job_type = StreamingJobType::from(stream_job); - let StreamingJobType::Table(table_job_type) = &job_type else { + if !matches!( + job_type, + StreamingJobType::Table(_) | StreamingJobType::Source + ) { bail!( - "only support replacing table streaming job, job_type: {:?}", + "only support replacing table or source streaming job, job_type: {:?}", job_type ) }; // Map the column indices in the dispatchers with the given mapping. - let (downstream_fragments, downstream_actor_location) = self - .metadata_manager - .get_downstream_chain_fragments(id) - .await?; - let downstream_fragments = downstream_fragments - .into_iter() - .map(|(d, f)| - if let Some(mapping) = &table_col_index_mapping { - Some((mapping.rewrite_dispatch_strategy(&d)?, f)) - } else { - Some((d, f)) - }) - .collect::>() - .ok_or_else(|| { - // The `rewrite` only fails if some column is dropped. - MetaError::invalid_parameter( - "unable to drop the column due to being referenced by downstream materialized views or sinks", - ) - })?; + let (mut downstream_fragments, downstream_actor_location) = + self.metadata_manager.get_downstream_fragments(id).await?; + if let Some(mapping) = &table_col_index_mapping { + for (d, _f) in &mut downstream_fragments { + *d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| { + // The `rewrite` only fails if some column is dropped. + MetaError::invalid_parameter( + "unable to drop the column due to being referenced by downstream materialized views or sinks", + ) + })?; + } + } // build complete graph based on the table job type - let complete_graph = match table_job_type { - TableJobType::General => CompleteStreamFragmentGraph::with_downstreams( - fragment_graph, - original_table_fragment.fragment_id, - downstream_fragments, - downstream_actor_location, - job_type, - )?, - - TableJobType::SharedCdcSource => { + let complete_graph = match &job_type { + StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => { + CompleteStreamFragmentGraph::with_downstreams( + fragment_graph, + original_mview_fragment.fragment_id, + downstream_fragments, + downstream_actor_location, + job_type, + )? + } + StreamingJobType::Table(TableJobType::SharedCdcSource) => { // get the upstream fragment which should be the cdc source let (upstream_root_fragments, upstream_actor_location) = self .metadata_manager @@ -1803,21 +1769,21 @@ impl DdlController { fragment_graph, upstream_root_fragments, upstream_actor_location, - original_table_fragment.fragment_id, + original_mview_fragment.fragment_id, downstream_fragments, downstream_actor_location, job_type, )? } - TableJobType::Unspecified => { - unreachable!() - } + _ => unreachable!(), }; // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let parallelism = NonZeroUsize::new(original_table_fragment.get_actors().len()) + // XXX: what is this parallelism? + // Is it "assigned parallelism"? + let parallelism = NonZeroUsize::new(original_mview_fragment.get_actors().len()) .expect("The number of actors in the original table fragment should be greater than 0"); let actor_graph_builder = @@ -1831,8 +1797,11 @@ impl DdlController { merge_updates, } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?; - // general table job type does not have upstream job, so the dispatchers should be empty - if matches!(table_job_type, TableJobType::General) { + // general table & source does not have upstream job, so the dispatchers should be empty + if matches!( + job_type, + StreamingJobType::Source | StreamingJobType::Table(TableJobType::General) + ) { assert!(dispatchers.is_empty()); } @@ -1840,7 +1809,7 @@ impl DdlController { // the context that contains all information needed for building the actors on the compute // nodes. let stream_job_fragments = StreamJobFragments::new( - (tmp_table_id as u32).into(), + (tmp_job_id as u32).into(), graph, &building_locations.actor_locations, stream_ctx, @@ -1850,6 +1819,7 @@ impl DdlController { // Note: no need to set `vnode_count` as it's already set by the frontend. // See `get_replace_table_plan`. + // TODO(alter-source): check what does this mean let ctx = ReplaceStreamJobContext { old_fragments, @@ -1858,7 +1828,7 @@ impl DdlController { building_locations, existing_locations, streaming_job: stream_job.clone(), - tmp_id: tmp_table_id as _, + tmp_id: tmp_job_id as _, }; Ok((ctx, stream_job_fragments)) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 2b8a10eed79f3..ab848c2b365b9 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -708,7 +708,8 @@ impl CompleteStreamFragmentGraph { } } - /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing + /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams). + /// e.g., MV on MV and CDC/Source Table with the upstream existing /// `Materialize` or `Source` fragments. pub fn with_upstreams( graph: StreamFragmentGraph, @@ -727,8 +728,8 @@ impl CompleteStreamFragmentGraph { ) } - /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table, with the - /// downstream existing `StreamScan` fragments. + /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source, + /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments. pub fn with_downstreams( graph: StreamFragmentGraph, original_table_fragment_id: FragmentId, @@ -748,7 +749,7 @@ impl CompleteStreamFragmentGraph { ) } - /// For replacing an existing table based on shared cdc source + /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams. pub fn with_upstreams_and_downstreams( graph: StreamFragmentGraph, upstream_root_fragments: HashMap, diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index e204600729898..0f2057afb403a 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -16,6 +16,7 @@ #![expect(clippy::all)] #![expect(clippy::doc_markdown)] +use std::collections::HashMap; use std::str::FromStr; use plan_common::AdditionalColumn; @@ -341,6 +342,30 @@ impl stream_plan::FragmentTypeFlag { } } +impl stream_plan::Dispatcher { + pub fn as_strategy(&self) -> stream_plan::DispatchStrategy { + stream_plan::DispatchStrategy { + r#type: self.r#type, + dist_key_indices: self.dist_key_indices.clone(), + output_indices: self.output_indices.clone(), + } + } +} + +impl meta::table_fragments::Fragment { + pub fn dispatches(&self) -> HashMap { + self.actors[0] + .dispatcher + .iter() + .map(|d| { + let fragment_id = d.dispatcher_id as _; + let strategy = d.as_strategy(); + (fragment_id, strategy) + }) + .collect() + } +} + impl catalog::StreamSourceInfo { /// Refer to [`Self::cdc_source_job`] for details. pub fn is_shared(&self) -> bool { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index edf83355651aa..3e249f512c2bf 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -551,9 +551,14 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source( + &self, + source: PbSource, + plan: Option, + ) -> Result { let request = AlterSourceRequest { source: Some(source), + plan, }; let resp = self.inner.alter_source(request).await?; Ok(resp