diff --git a/proto/catalog.proto b/proto/catalog.proto index 663f146ff415e..0448af750273c 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -316,6 +316,11 @@ message Function { message AggregateFunction {} } +// Includes full information about a table. +// +// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed. +// It is not the same as a user-side table created by `CREATE TABLE`. +// // See `TableCatalog` struct in frontend crate for more information. message Table { enum TableType { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 884296f76fed9..59d8b2f999371 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -354,7 +354,7 @@ message DropIndexResponse { } message ReplaceTablePlan { - // The new table catalog, with the correct table ID and a new version. + // The new table catalog, with the correct (old) table ID and a new version. // If the new version does not match the subsequent version in the meta service's // catalog, this request will be rejected. catalog.Table table = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 5c6d1c64274fd..37527d6a87ac0 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -36,7 +36,9 @@ service HeartbeatService { rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); } -// Fragments of a Streaming Job +// Fragments of a Streaming Job. +// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`. +// It's not the same as a storage table correlated with a `TableCatalog`. message TableFragments { // The state of the fragments of this table enum State { @@ -96,6 +98,7 @@ message TableFragments { // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } + // The id of the streaming job. uint32 table_id = 1; State state = 2; map fragments = 3; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 3c13715a63f28..6ae880ca14830 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -35,14 +35,12 @@ use crate::expr::ExprImpl; use crate::optimizer::property::Cardinality; use crate::user::UserId; -/// Includes full information about a table. +/// `TableCatalog` Includes full information about a table. /// -/// Currently, it can be either: -/// - a table or a source -/// - a materialized view -/// - an index +/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed. +/// It is not the same as a user-side table created by `CREATE TABLE`. /// -/// Use `self.table_type()` to determine the type of the table. +/// Use [`Self::table_type()`] to determine the [`TableType`] of the table. /// /// # Column ID & Column Index /// @@ -191,6 +189,7 @@ pub enum TableType { /// Tables created by `CREATE MATERIALIZED VIEW`. MaterializedView, /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`. + /// An index has both a `TableCatalog` and an `IndexCatalog`. Index, /// Internal tables for executors. Internal, diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 1489738c2f9e2..c91f63196c667 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -232,7 +232,11 @@ impl CheckpointControl { .values() { progress.extend([( - creating_job.info.table_fragments.table_id().table_id, + creating_job + .info + .stream_job_fragments + .stream_job_id() + .table_id, creating_job.gen_ddl_progress(), )]); } @@ -676,7 +680,7 @@ impl DatabaseCheckpointControl { resps, self.creating_streaming_job_controls[&table_id] .info - .table_fragments + .stream_job_fragments .all_table_ids() .map(TableId::new), is_first_time, @@ -830,7 +834,7 @@ impl DatabaseCheckpointControl { .expect("checked Some") .to_mutation(None) .expect("should have some mutation in `CreateStreamingJob` command"); - let job_id = info.table_fragments.table_id(); + let job_id = info.stream_job_fragments.stream_job_id(); control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?; self.creating_streaming_job_controls.insert( job_id, diff --git a/src/meta/src/barrier/checkpoint/creating_job/mod.rs b/src/meta/src/barrier/checkpoint/creating_job/mod.rs index db50b9d8335f4..ef1f5a8a886c4 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/mod.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/mod.rs @@ -61,19 +61,19 @@ impl CreatingStreamingJobControl { initial_mutation: Mutation, ) -> Self { info!( - table_id = info.table_fragments.table_id().table_id, + table_id = info.stream_job_fragments.stream_job_id().table_id, definition = info.definition, "new creating job" ); - let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids(); + let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids(); let mut create_mview_tracker = CreateMviewProgressTracker::default(); create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat); let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect(); - let table_id = info.table_fragments.table_id(); + let table_id = info.stream_job_fragments.stream_job_id(); let table_id_str = format!("{}", table_id.table_id); - let actors_to_create = info.table_fragments.actors_to_create(); + let actors_to_create = info.stream_job_fragments.actors_to_create(); let graph_info = InflightStreamingJobInfo { job_id: table_id, fragment_infos, @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl { } else { let progress = create_mview_tracker .gen_ddl_progress() - .remove(&self.info.table_fragments.table_id().table_id) + .remove(&self.info.stream_job_fragments.stream_job_id().table_id) .expect("should exist"); format!("Snapshot [{}]", progress.progress) } @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl { } }; DdlProgress { - id: self.info.table_fragments.table_id().table_id as u64, + id: self.info.stream_job_fragments.stream_job_id().table_id as u64, statement: self.info.definition.clone(), progress, } @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl { command: Option<&Command>, barrier_info: &BarrierInfo, ) -> MetaResult<()> { - let table_id = self.info.table_fragments.table_id(); + let table_id = self.info.stream_job_fragments.stream_job_id(); let start_consume_upstream = if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command { jobs_to_merge.contains_key(&table_id) @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl { }; if start_consume_upstream { info!( - table_id = self.info.table_fragments.table_id().table_id, + table_id = self.info.stream_job_fragments.stream_job_id().table_id, prev_epoch = barrier_info.prev_epoch(), "start consuming upstream" ); @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl { { Self::inject_barrier( DatabaseId::new(self.info.streaming_job.database_id()), - self.info.table_fragments.table_id(), + self.info.stream_job_fragments.stream_job_id(), control_stream_manager, &mut self.barrier_control, &self.graph_info, @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl { let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); self.barrier_control.collect(epoch, worker_id, resp); if let Some(prev_barriers_to_inject) = prev_barriers_to_inject { - let table_id = self.info.table_fragments.table_id(); + let table_id = self.info.stream_job_fragments.stream_job_id(); for info in prev_barriers_to_inject { Self::inject_barrier( DatabaseId::new(self.info.streaming_job.database_id()), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f19feadabed56..2a5db2ab9eb5a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -49,7 +49,7 @@ use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{DdlType, StreamingJob}; -use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; +use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; /// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors @@ -88,8 +88,8 @@ pub struct Reschedule { /// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]). #[derive(Debug, Clone)] pub struct ReplaceTablePlan { - pub old_table_fragments: TableFragments, - pub new_table_fragments: TableFragments, + pub old_fragments: StreamJobFragments, + pub new_fragments: StreamJobFragments, pub merge_updates: Vec, pub dispatchers: HashMap>, /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids. @@ -107,7 +107,7 @@ pub struct ReplaceTablePlan { impl ReplaceTablePlan { fn fragment_changes(&self) -> HashMap { let mut fragment_changes = HashMap::new(); - for fragment in self.new_table_fragments.fragments.values() { + for fragment in self.new_fragments.fragments.values() { let fragment_change = CommandFragmentChanges::NewFragment( self.streaming_job.id().into(), InflightFragmentInfo { @@ -117,7 +117,7 @@ impl ReplaceTablePlan { .map(|actor| { ( actor.actor_id, - self.new_table_fragments + self.new_fragments .actor_status .get(&actor.actor_id) .expect("should exist") @@ -136,7 +136,7 @@ impl ReplaceTablePlan { .insert(fragment.fragment_id, fragment_change) .is_none()); } - for fragment in self.old_table_fragments.fragments.values() { + for fragment in self.old_fragments.fragments.values() { assert!(fragment_changes .insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) .is_none()); @@ -149,7 +149,7 @@ impl ReplaceTablePlan { #[educe(Debug)] pub struct CreateStreamingJobCommandInfo { #[educe(Debug(ignore))] - pub table_fragments: TableFragments, + pub stream_job_fragments: StreamJobFragments, /// Refer to the doc on [`crate::manager::MetadataManager::get_upstream_root_fragments`] for the meaning of "root". pub upstream_root_actors: HashMap>, pub dispatchers: HashMap>, @@ -165,32 +165,36 @@ impl CreateStreamingJobCommandInfo { pub(super) fn new_fragment_info( &self, ) -> impl Iterator + '_ { - self.table_fragments.fragments.values().map(|fragment| { - ( - fragment.fragment_id, - InflightFragmentInfo { - actors: fragment - .actors - .iter() - .map(|actor| { - ( - actor.actor_id, - self.table_fragments - .actor_status - .get(&actor.actor_id) - .expect("should exist") - .worker_id() as WorkerId, - ) - }) - .collect(), - state_table_ids: fragment - .state_table_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - }, - ) - }) + self.stream_job_fragments + .fragments + .values() + .map(|fragment| { + ( + fragment.fragment_id, + InflightFragmentInfo { + actors: fragment + .actors + .iter() + .map(|actor| { + ( + actor.actor_id, + self.stream_job_fragments + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .worker_id() + as WorkerId, + ) + }) + .collect(), + state_table_ids: fragment + .state_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + }, + ) + }) } } @@ -309,9 +313,9 @@ impl Command { Self::Resume(reason) } - pub fn cancel(table_fragments: &TableFragments) -> Self { + pub fn cancel(table_fragments: &StreamJobFragments) -> Self { Self::DropStreamingJobs { - table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]), + table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]), actors: table_fragments.actor_ids(), unregistered_state_table_ids: table_fragments .all_table_ids() @@ -512,7 +516,7 @@ impl CommandContext { &self.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { - let table_fragments = &info.table_fragments; + let table_fragments = &info.stream_job_fragments; let mut table_ids: HashSet<_> = table_fragments .internal_table_ids() .into_iter() @@ -637,7 +641,7 @@ impl Command { Command::CreateStreamingJob { info: CreateStreamingJobCommandInfo { - table_fragments, + stream_job_fragments: table_fragments, dispatchers, init_split_assignment: split_assignment, .. @@ -668,7 +672,7 @@ impl Command { .upstream_mv_table_ids .iter() .map(|table_id| SubscriptionUpstreamInfo { - subscriber_id: table_fragments.table_id().table_id, + subscriber_id: table_fragments.stream_job_id().table_id, upstream_mv_table_id: table_id.table_id, }) .collect() @@ -685,8 +689,8 @@ impl Command { })); if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments: _, + old_fragments, + new_fragments: _, merge_updates, dispatchers, init_split_assignment, @@ -695,7 +699,7 @@ impl Command { { // TODO: support in v2. let update = Self::generate_update_mutation_for_replace_table( - old_table_fragments, + old_fragments, merge_updates, dispatchers, init_split_assignment, @@ -728,13 +732,13 @@ impl Command { } Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, + old_fragments, merge_updates, dispatchers, init_split_assignment, .. }) => Self::generate_update_mutation_for_replace_table( - old_table_fragments, + old_fragments, merge_updates, dispatchers, init_split_assignment, @@ -916,14 +920,14 @@ impl Command { let mut map = match job_type { CreateStreamingJobType::Normal => HashMap::new(), CreateStreamingJobType::SinkIntoTable(replace_table) => { - replace_table.new_table_fragments.actors_to_create() + replace_table.new_fragments.actors_to_create() } CreateStreamingJobType::SnapshotBackfill(_) => { // for snapshot backfill, the actors to create is measured separately return None; } }; - for (worker_id, new_actors) in info.table_fragments.actors_to_create() { + for (worker_id, new_actors) in info.stream_job_fragments.actors_to_create() { map.entry(worker_id).or_default().extend(new_actors) } Some(map) @@ -940,19 +944,19 @@ impl Command { Some(map) } Command::ReplaceTable(replace_table) => { - Some(replace_table.new_table_fragments.actors_to_create()) + Some(replace_table.new_fragments.actors_to_create()) } _ => None, } } fn generate_update_mutation_for_replace_table( - old_table_fragments: &TableFragments, + old_fragments: &StreamJobFragments, merge_updates: &[MergeUpdate], dispatchers: &HashMap>, init_split_assignment: &SplitAssignment, ) -> Option { - let dropped_actors = old_table_fragments.actor_ids(); + let dropped_actors = old_fragments.actor_ids(); let actor_new_dispatchers = dispatchers .iter() diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 2ccdee2782861..a687312f29e16 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -164,7 +164,7 @@ impl CommandContext { Command::CreateStreamingJob { info, job_type } => { let CreateStreamingJobCommandInfo { - table_fragments, + stream_job_fragments, dispatchers, init_split_assignment, .. @@ -172,16 +172,16 @@ impl CommandContext { barrier_manager_context .metadata_manager .catalog_controller - .post_collect_table_fragments( - table_fragments.table_id().table_id as _, - table_fragments.actor_ids(), + .post_collect_job_fragments( + stream_job_fragments.stream_job_id().table_id as _, + stream_job_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, ) .await?; if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan { - new_table_fragments, + new_fragments, dispatchers, init_split_assignment, .. @@ -190,9 +190,9 @@ impl CommandContext { barrier_manager_context .metadata_manager .catalog_controller - .post_collect_table_fragments( - new_table_fragments.table_id().table_id as _, - new_table_fragments.actor_ids(), + .post_collect_job_fragments( + new_fragments.stream_job_id().table_id as _, + new_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, ) @@ -200,8 +200,8 @@ impl CommandContext { } // Extract the fragments that include source operators. - let source_fragments = table_fragments.stream_source_fragments(); - let backfill_fragments = table_fragments.source_backfill_fragments()?; + let source_fragments = stream_job_fragments.stream_source_fragments(); + let backfill_fragments = stream_job_fragments.source_backfill_fragments()?; barrier_manager_context .source_manager .apply_source_change( @@ -224,8 +224,8 @@ impl CommandContext { } Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, + old_fragments, + new_fragments, dispatchers, init_split_assignment, .. @@ -234,9 +234,9 @@ impl CommandContext { barrier_manager_context .metadata_manager .catalog_controller - .post_collect_table_fragments( - new_table_fragments.table_id().table_id as _, - new_table_fragments.actor_ids(), + .post_collect_job_fragments( + new_fragments.stream_job_id().table_id as _, + new_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, ) @@ -245,11 +245,11 @@ impl CommandContext { // Apply the split changes in source manager. barrier_manager_context .source_manager - .drop_source_fragments_vec(std::slice::from_ref(old_table_fragments)) + .drop_source_fragments_vec(std::slice::from_ref(old_fragments)) .await; - let source_fragments = new_table_fragments.stream_source_fragments(); + let source_fragments = new_fragments.stream_source_fragments(); // XXX: is it possible to have backfill fragments here? - let backfill_fragments = new_table_fragments.source_backfill_fragments()?; + let backfill_fragments = new_fragments.source_backfill_fragments()?; barrier_manager_context .source_manager .apply_source_change( diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 344d8af0a1e61..ee25f1ae84117 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -33,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, TableFragments, TableParallelism}; +use crate::model::{ActorId, StreamJobFragments, TableParallelism}; use crate::stream::{RescheduleOptions, TableResizePolicy}; use crate::{model, MetaResult}; @@ -66,7 +66,7 @@ impl GlobalBarrierWorkerContextImpl { async fn recover_background_mv_progress( &self, - ) -> MetaResult> { + ) -> MetaResult> { let mgr = &self.metadata_manager; let mviews = mgr .catalog_controller @@ -80,14 +80,17 @@ impl GlobalBarrierWorkerContextImpl { .catalog_controller .get_job_fragments_by_id(mview.table_id) .await?; - let table_fragments = TableFragments::from_protobuf(table_fragments); - if table_fragments.tracking_progress_actor_ids().is_empty() { + let stream_job_fragments = StreamJobFragments::from_protobuf(table_fragments); + if stream_job_fragments + .tracking_progress_actor_ids() + .is_empty() + { // If there's no tracking actor in the mview, we can finish the job directly. mgr.catalog_controller .finish_streaming_job(mview.table_id, None) .await?; } else { - mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); + mview_map.insert(table_id, (mview.definition.clone(), stream_job_fragments)); } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b8680a0e520c9..4143827027693 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -26,7 +26,7 @@ use tokio::sync::oneshot::Sender; use self::notifier::Notifier; use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, TableFragments}; +use crate::model::{ActorId, StreamJobFragments}; use crate::{MetaError, MetaResult}; mod checkpoint; @@ -105,7 +105,7 @@ struct BarrierWorkerRuntimeInfoSnapshot { subscription_infos: HashMap, stream_actors: HashMap, source_splits: HashMap>, - background_jobs: HashMap, + background_jobs: HashMap, hummock_version_stats: HummockVersionStats, } diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index a40d526bc9ee2..eac060004116a 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -30,7 +30,7 @@ use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, }; use crate::manager::{DdlType, MetadataManager}; -use crate::model::{ActorId, BackfillUpstreamType, TableFragments}; +use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments}; use crate::MetaResult; type ConsumedRows = u64; @@ -246,7 +246,7 @@ impl TrackingJob { pub(crate) fn table_to_create(&self) -> TableId { match self { - TrackingJob::New(command) => command.info.table_fragments.table_id(), + TrackingJob::New(command) => command.info.stream_job_fragments.stream_job_id(), TrackingJob::Recovered(recovered) => (recovered.id as u32).into(), } } @@ -258,7 +258,7 @@ impl std::fmt::Debug for TrackingJob { TrackingJob::New(command) => write!( f, "TrackingJob::New({:?})", - command.info.table_fragments.table_id() + command.info.stream_job_fragments.stream_job_id() ), TrackingJob::Recovered(recovered) => { write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id) @@ -302,7 +302,7 @@ impl CreateMviewProgressTracker { /// 1. `CreateMviewProgress`. /// 2. `Backfill` position. pub fn recover( - mview_map: HashMap, + mview_map: HashMap, version_stats: &HummockVersionStats, ) -> Self { let mut actor_map = HashMap::new(); @@ -500,7 +500,8 @@ impl CreateMviewProgressTracker { tracing::trace!(?info, "add job to track"); let (info, actors, replace_table_info) = { let CreateStreamingJobCommandInfo { - table_fragments, .. + stream_job_fragments: table_fragments, + .. } = info; let actors = table_fragments.tracking_progress_actor_ids(); if actors.is_empty() { @@ -514,7 +515,7 @@ impl CreateMviewProgressTracker { }; let CreateStreamingJobCommandInfo { - table_fragments, + stream_job_fragments: table_fragments, upstream_root_actors, dispatchers, definition, @@ -523,7 +524,7 @@ impl CreateMviewProgressTracker { .. } = &info; - let creating_mv_id = table_fragments.table_id(); + let creating_mv_id = table_fragments.stream_job_id(); let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = { // Keep track of how many times each upstream MV appears. diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 2b3b78ede2f70..4fc25d25b04f2 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -198,7 +198,7 @@ impl BarrierScheduler { if let Some(idx) = queue.queue.iter().position(|scheduled| { if let Command::CreateStreamingJob { info, .. } = &scheduled.command - && info.table_fragments.table_id() == table_id + && info.stream_job_fragments.stream_job_id() == table_id { true } else { diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 78e6bde07b87b..672fef180afde 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -163,7 +163,7 @@ impl CatalogController { } #[allow(clippy::type_complexity)] - pub fn extract_fragment_and_actors_from_table_fragments( + pub fn extract_fragment_and_actors_from_fragments( PbTableFragments { table_id, fragments, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 23eec2f8075d3..f993e70beadb6 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -72,7 +72,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; -use crate::model::{StreamContext, TableFragments, TableParallelism}; +use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -410,13 +410,13 @@ impl CatalogController { // making them the source of truth and performing a full replacement for those in the meta store? pub async fn prepare_streaming_job( &self, - table_fragments: &TableFragments, + stream_job_fragments: &StreamJobFragments, streaming_job: &StreamingJob, for_replace: bool, ) -> MetaResult<()> { let fragment_actors = - Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?; - let all_tables = table_fragments.all_tables(); + Self::extract_fragment_and_actors_from_fragments(stream_job_fragments.to_protobuf())?; + let all_tables = stream_job_fragments.all_tables(); let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -608,7 +608,7 @@ impl CatalogController { Ok((true, Some(database_id))) } - pub async fn post_collect_table_fragments( + pub async fn post_collect_job_fragments( &self, job_id: ObjectId, actor_ids: Vec, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 02b63ab47de62..18bb8dfaf86b3 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -138,7 +138,7 @@ impl HummockManager { /// Unregisters `table_fragments` from compaction groups pub async fn unregister_table_fragments_vec( &self, - table_fragments: &[crate::model::TableFragments], + table_fragments: &[crate::model::StreamJobFragments], ) { self.unregister_table_ids( table_fragments @@ -683,7 +683,7 @@ mod tests { use crate::hummock::error::Result; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::test_utils::setup_compute_env; - use crate::model::TableFragments; + use crate::model::StreamJobFragments; #[tokio::test] async fn test_inner() { @@ -752,7 +752,7 @@ mod tests { #[tokio::test] async fn test_manager() { let (_, compaction_group_manager, ..) = setup_compute_env(8080).await; - let table_fragment_1 = TableFragments::for_test( + let table_fragment_1 = StreamJobFragments::for_test( TableId::new(10), BTreeMap::from([( 1, @@ -763,7 +763,7 @@ mod tests { }, )]), ); - let table_fragment_2 = TableFragments::for_test( + let table_fragment_2 = StreamJobFragments::for_test( TableId::new(20), BTreeMap::from([( 2, @@ -790,7 +790,7 @@ mod tests { compaction_group_manager .register_table_fragments( - Some(table_fragment_1.table_id().table_id), + Some(table_fragment_1.stream_job_id().table_id), table_fragment_1.internal_table_ids(), ) .await @@ -798,7 +798,7 @@ mod tests { assert_eq!(registered_number().await, 4); compaction_group_manager .register_table_fragments( - Some(table_fragment_2.table_id().table_id), + Some(table_fragment_2.stream_job_id().table_id), table_fragment_2.internal_table_ids(), ) .await @@ -827,7 +827,7 @@ mod tests { compaction_group_manager .register_table_fragments( - Some(table_fragment_1.table_id().table_id), + Some(table_fragment_1.stream_job_id().table_id), table_fragment_1.internal_table_ids(), ) .await diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index b974ad82b0536..a10d405d31dff 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -39,7 +39,7 @@ use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, Wor use crate::controller::fragment::FragmentParallelismInfo; use crate::manager::{LocalNotification, NotificationVersion}; use crate::model::{ - ActorId, ClusterId, FragmentId, SubscriptionId, TableFragments, TableParallelism, + ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId, TableParallelism, }; use crate::stream::SplitAssignment; use crate::telemetry::MetaTelemetryJobDesc; @@ -551,12 +551,15 @@ impl MetadataManager { }) } - pub async fn get_job_fragments_by_id(&self, id: &TableId) -> MetaResult { + pub async fn get_job_fragments_by_id( + &self, + job_id: &TableId, + ) -> MetaResult { let pb_table_fragments = self .catalog_controller - .get_job_fragments_by_id(id.table_id as _) + .get_job_fragments_by_id(job_id.table_id as _) .await?; - Ok(TableFragments::from_protobuf(pb_table_fragments)) + Ok(StreamJobFragments::from_protobuf(pb_table_fragments)) } pub async fn get_running_actors_of_fragment( @@ -596,14 +599,14 @@ impl MetadataManager { pub async fn get_job_fragments_by_ids( &self, ids: &[TableId], - ) -> MetaResult> { + ) -> MetaResult> { let mut table_fragments = vec![]; for id in ids { let pb_table_fragments = self .catalog_controller .get_job_fragments_by_id(id.table_id as _) .await?; - table_fragments.push(TableFragments::from_protobuf(pb_table_fragments)); + table_fragments.push(StreamJobFragments::from_protobuf(pb_table_fragments)); } Ok(table_fragments) } @@ -612,7 +615,7 @@ impl MetadataManager { let table_fragments = self.catalog_controller.table_fragments().await?; let mut actor_maps = HashMap::new(); for (_, fragments) in table_fragments { - let tf = TableFragments::from_protobuf(fragments); + let tf = StreamJobFragments::from_protobuf(fragments); for actor in tf.active_actors() { actor_maps .try_insert(actor.actor_id, actor) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index a93a4c4d02272..e5490a86365b9 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -85,14 +85,15 @@ impl From for PbTableParallelism { } } -/// Fragments of a streaming job. +/// Fragments of a streaming job. Corresponds to [`PbTableFragments`]. +/// (It was previously called `TableFragments` due to historical reasons.) /// /// We store whole fragments in a single column family as follow: -/// `table_id` => `TableFragments`. +/// `stream_job_id` => `StreamJobFragments`. #[derive(Debug, Clone)] -pub struct TableFragments { +pub struct StreamJobFragments { /// The table id. - table_id: TableId, + stream_job_id: TableId, /// The state of the table fragments. state: State, @@ -157,10 +158,10 @@ impl StreamContext { } } -impl TableFragments { +impl StreamJobFragments { pub fn to_protobuf(&self) -> PbTableFragments { PbTableFragments { - table_id: self.table_id.table_id(), + table_id: self.stream_job_id.table_id(), state: self.state as _, fragments: self.fragments.clone().into_iter().collect(), actor_status: self.actor_status.clone().into_iter().collect(), @@ -183,7 +184,7 @@ impl TableFragments { let state = prost.state(); Self { - table_id: TableId::new(prost.table_id), + stream_job_id: TableId::new(prost.table_id), state, fragments: prost.fragments.into_iter().collect(), actor_status: prost.actor_status.into_iter().collect(), @@ -197,7 +198,7 @@ impl TableFragments { } } -impl TableFragments { +impl StreamJobFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { Self::new( @@ -213,7 +214,7 @@ impl TableFragments { /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to /// `Inactive` on the given workers. pub fn new( - table_id: TableId, + stream_job_id: TableId, fragments: BTreeMap, actor_locations: &BTreeMap, ctx: StreamContext, @@ -234,7 +235,7 @@ impl TableFragments { .collect(); Self { - table_id, + stream_job_id, state: State::Initial, fragments, actor_status, @@ -254,8 +255,8 @@ impl TableFragments { } /// Returns the table id. - pub fn table_id(&self) -> TableId { - self.table_id + pub fn stream_job_id(&self) -> TableId { + self.stream_job_id } /// Returns the state of the table fragments. @@ -278,12 +279,6 @@ impl TableFragments { self.state == State::Initial } - /// Set the table ID. - // TODO: remove this workaround for replacing table. - pub fn set_table_id(&mut self, table_id: TableId) { - self.table_id = table_id; - } - /// Set the state of the table fragments. pub fn set_state(&mut self, state: State) { self.state = state; @@ -536,9 +531,9 @@ impl TableFragments { .fragments .values() .flat_map(|f| f.state_table_ids.iter()) - .any(|table_id| *table_id == self.table_id.table_id) + .any(|table_id| *table_id == self.stream_job_id.table_id) { - Some(self.table_id.table_id) + Some(self.stream_job_id.table_id) } else { None } @@ -580,7 +575,7 @@ impl TableFragments { self.fragments .values() .flat_map(|f| f.state_table_ids.clone()) - .filter(|&t| t != self.table_id.table_id) + .filter(|&t| t != self.stream_job_id.table_id) .collect_vec() } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3fd8112f43065..dacf908071533 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -71,7 +71,7 @@ use crate::manager::{ DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; +use crate::model::{FragmentId, StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -615,7 +615,7 @@ impl DdlController { /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node pub(crate) async fn validate_cdc_table( table: &Table, - table_fragments: &TableFragments, + table_fragments: &StreamJobFragments, ) -> MetaResult<()> { let stream_scan_fragment = table_fragments .fragments @@ -666,18 +666,18 @@ impl DdlController { mgr: &MetadataManager, stream_ctx: StreamContext, sink: Option<&Sink>, - creating_sink_table_fragments: Option<&TableFragments>, + creating_sink_table_fragments: Option<&StreamJobFragments>, dropping_sink_id: Option, streaming_job: &StreamingJob, fragment_graph: StreamFragmentGraph, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { - let (mut replace_table_ctx, mut table_fragments) = self + ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { + let (mut replace_table_ctx, mut stream_job_fragments) = self .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) .await?; let mut union_fragment_id = None; - for (fragment_id, fragment) in &mut table_fragments.fragments { + for (fragment_id, fragment) in &mut stream_job_fragments.fragments { for actor in &mut fragment.actors { if let Some(node) = &mut actor.nodes { visit_stream_node(node, |body| { @@ -707,7 +707,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - &mut table_fragments, + &mut stream_job_fragments, target_fragment_id, None, ); @@ -746,7 +746,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - &mut table_fragments, + &mut stream_job_fragments, target_fragment_id, Some(&sink.unique_identity()), ); @@ -754,7 +754,7 @@ impl DdlController { } // check if the union fragment is fully assigned. - for fragment in table_fragments.fragments.values_mut() { + for fragment in stream_job_fragments.fragments.values_mut() { for actor in &mut fragment.actors { if let Some(node) = &mut actor.nodes { visit_stream_node(node, |node| { @@ -766,7 +766,7 @@ impl DdlController { } } - Ok((replace_table_ctx, table_fragments)) + Ok((replace_table_ctx, stream_job_fragments)) } pub(crate) fn inject_replace_table_plan_for_sink( @@ -774,7 +774,7 @@ impl DdlController { sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceTableContext, - table_fragments: &mut TableFragments, + stream_job_fragments: &mut StreamJobFragments, target_fragment_id: FragmentId, unique_identity: Option<&str>, ) { @@ -784,7 +784,7 @@ impl DdlController { .map(|a| a.actor_id) .collect_vec(); - let union_fragment = table_fragments + let union_fragment = stream_job_fragments .fragments .get_mut(&target_fragment_id) .unwrap(); @@ -1048,7 +1048,7 @@ impl DdlController { // create fragment and actor catalogs. tracing::debug!(id = streaming_job.id(), "building streaming job"); - let (ctx, table_fragments) = self + let (ctx, stream_job_fragments) = self .build_stream_job( ctx, streaming_job, @@ -1061,7 +1061,7 @@ impl DdlController { match streaming_job { StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { - Self::validate_cdc_table(table, &table_fragments).await?; + Self::validate_cdc_table(table, &stream_job_fragments).await?; } StreamingJob::Table(Some(source), ..) => { // Register the source on the connector node. @@ -1080,7 +1080,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(&table_fragments, streaming_job, false) + .prepare_streaming_job(&stream_job_fragments, streaming_job, false) .await?; // create streaming jobs. @@ -1091,7 +1091,7 @@ impl DdlController { // FIXME(kwannoel): Unify background stream's creation path with MV below. | (CreateType::Background, StreamingJob::Sink(_, _)) => { let version = self.stream_manager - .create_streaming_job(table_fragments, ctx) + .create_streaming_job(stream_job_fragments, ctx) .await?; Ok(version) } @@ -1100,7 +1100,7 @@ impl DdlController { let fut = async move { let _ = ctrl .stream_manager - .create_streaming_job(table_fragments, ctx) + .create_streaming_job(stream_job_fragments, ctx) .await.inspect_err(|err| { tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job"); }); @@ -1202,7 +1202,7 @@ impl DdlController { ) .await? as u32; - let (ctx, table_fragments) = self + let (ctx, stream_job_fragments) = self .inject_replace_table_job_for_table_sink( tmp_id, &self.metadata_manager, @@ -1220,11 +1220,11 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(&table_fragments, &streaming_job, true) + .prepare_streaming_job(&stream_job_fragments, &streaming_job, true) .await?; self.stream_manager - .replace_table(table_fragments, ctx) + .replace_table(stream_job_fragments, ctx) .await?; merge_updates @@ -1354,7 +1354,7 @@ impl DdlController { let mut updated_sink_catalogs = vec![]; let result: MetaResult> = try { - let (mut ctx, mut table_fragments) = self + let (mut ctx, mut stream_job_fragments) = self .build_replace_table( ctx, &streaming_job, @@ -1366,7 +1366,7 @@ impl DdlController { let mut union_fragment_id = None; - for (fragment_id, fragment) in &mut table_fragments.fragments { + for (fragment_id, fragment) in &mut stream_job_fragments.fragments { for actor in &mut fragment.actors { if let Some(node) = &mut actor.nodes { visit_stream_node(node, |body| { @@ -1406,7 +1406,7 @@ impl DdlController { &sink_fragment, table, &mut ctx, - &mut table_fragments, + &mut stream_job_fragments, target_fragment_id, Some(&sink.unique_identity()), ); @@ -1420,11 +1420,11 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(&table_fragments, &streaming_job, true) + .prepare_streaming_job(&stream_job_fragments, &streaming_job, true) .await?; self.stream_manager - .replace_table(table_fragments, ctx) + .replace_table(stream_job_fragments, ctx) .await?; merge_updates }; @@ -1545,7 +1545,7 @@ impl DdlController { mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraph, affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>, - ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { + ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragments)> { let id = stream_job.id(); let specified_parallelism = fragment_graph.specified_parallelism(); let expr_context = stream_ctx.to_expr_context(); @@ -1622,7 +1622,7 @@ impl DdlController { _ => TableParallelism::Fixed(parallelism.get()), }; - let table_fragments = TableFragments::new( + let stream_job_fragments = StreamJobFragments::new( id.into(), graph, &building_locations.actor_locations, @@ -1630,9 +1630,9 @@ impl DdlController { table_parallelism, max_parallelism.get(), ); - let internal_tables = table_fragments.internal_tables(); + let internal_tables = stream_job_fragments.internal_tables(); - if let Some(mview_fragment) = table_fragments.mview_fragment() { + if let Some(mview_fragment) = stream_job_fragments.mview_fragment() { stream_job.set_table_vnode_count(mview_fragment.vnode_count()); } @@ -1667,7 +1667,7 @@ impl DdlController { &self.metadata_manager, stream_ctx, Some(s), - Some(&table_fragments), + Some(&stream_job_fragments), None, &streaming_job, fragment_graph, @@ -1702,7 +1702,7 @@ impl DdlController { snapshot_backfill_info, }; - Ok((ctx, table_fragments)) + Ok((ctx, stream_job_fragments)) } /// `build_replace_table` builds a table replacement and returns the context and new table @@ -1717,15 +1717,15 @@ impl DdlController { mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, tmp_table_id: TableId, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { + ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { let id = stream_job.id(); let expr_context = stream_ctx.to_expr_context(); - let old_table_fragments = self + let old_fragments = self .metadata_manager .get_job_fragments_by_id(&id.into()) .await?; - let old_internal_table_ids = old_table_fragments.internal_table_ids(); + let old_internal_table_ids = old_fragments.internal_table_ids(); let old_internal_tables = self .metadata_manager .get_table_catalog_by_ids(old_internal_table_ids) @@ -1735,7 +1735,7 @@ impl DdlController { // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete // graph that contains all information needed for building the actor graph. - let original_table_fragment = old_table_fragments + let original_table_fragment = old_fragments .mview_fragment() .expect("mview fragment not found"); @@ -1825,20 +1825,20 @@ impl DdlController { // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. - let table_fragments = TableFragments::new( + let stream_job_fragments = StreamJobFragments::new( (tmp_table_id as u32).into(), graph, &building_locations.actor_locations, stream_ctx, - old_table_fragments.assigned_parallelism, - old_table_fragments.max_parallelism, + old_fragments.assigned_parallelism, + old_fragments.max_parallelism, ); // Note: no need to set `vnode_count` as it's already set by the frontend. // See `get_replace_table_plan`. let ctx = ReplaceTableContext { - old_table_fragments, + old_fragments, merge_updates, dispatchers, building_locations, @@ -1847,7 +1847,7 @@ impl DdlController { tmp_id: tmp_table_id as _, }; - Ok((ctx, table_fragments)) + Ok((ctx, stream_job_fragments)) } async fn alter_name( diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index be962a63ff80f..f0fc22aadd92c 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -42,7 +42,7 @@ use tokio::{select, time}; use crate::barrier::{BarrierScheduler, Command}; use crate::manager::MetadataManager; -use crate::model::{ActorId, FragmentId, TableFragments}; +use crate::model::{ActorId, FragmentId, StreamJobFragments}; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -763,7 +763,7 @@ impl SourceManager { } /// For dropping MV. - pub async fn drop_source_fragments_vec(&self, table_fragments: &[TableFragments]) { + pub async fn drop_source_fragments_vec(&self, table_fragments: &[StreamJobFragments]) { let mut core = self.core.lock().await; // Extract the fragments that include source operators. @@ -878,11 +878,11 @@ impl SourceManager { } /// Allocates splits to actors for a newly created source executor. - pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult { + pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult { let core = self.core.lock().await; let table_fragments = core .metadata_manager - .get_job_fragments_by_id(table_id) + .get_job_fragments_by_id(job_id) .await?; let source_fragments = table_fragments.stream_source_fragments(); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index fcd3a2c5d1859..210c9c157cdde 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -458,7 +458,7 @@ impl StreamFragmentGraph { /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built. /// Be careful when using the returned values. /// - /// See also [`crate::model::TableFragments::internal_tables`]. + /// See also [`crate::model::StreamJobFragments::internal_tables`]. pub fn incomplete_internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2d7ab47a7c784..cd76e124a2d1f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -35,7 +35,7 @@ use crate::barrier::{ }; use crate::error::bail_invalid_parameter; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; -use crate::model::{ActorId, FragmentId, TableFragments, TableParallelism}; +use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -77,7 +77,7 @@ pub struct CreateStreamingJobContext { pub ddl_type: DdlType, /// Context provided for potential replace table, typically used when sinking into a table. - pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, + pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, StreamJobFragments)>, pub snapshot_backfill_info: Option, @@ -168,8 +168,8 @@ type CreatingStreamingJobInfoRef = Arc; /// /// Note: for better readability, keep this struct complete and immutable once created. pub struct ReplaceTableContext { - /// The old table fragments to be replaced. - pub old_table_fragments: TableFragments, + /// The old job fragments to be replaced. + pub old_fragments: StreamJobFragments, /// The updates to be applied to the downstream chain actors. Used for schema change. pub merge_updates: Vec, @@ -234,10 +234,10 @@ impl GlobalStreamManager { /// 4. Store related meta data. pub async fn create_streaming_job( self: &Arc, - table_fragments: TableFragments, + stream_job_fragments: StreamJobFragments, ctx: CreateStreamingJobContext, ) -> MetaResult { - let table_id = table_fragments.table_id(); + let table_id = stream_job_fragments.stream_job_id(); let database_id = ctx.streaming_job.database_id().into(); let (sender, mut receiver) = tokio::sync::mpsc::channel(10); let execution = StreamingJobExecution::new(table_id, sender.clone()); @@ -246,7 +246,7 @@ impl GlobalStreamManager { let stream_manager = self.clone(); let fut = async move { let res = stream_manager - .create_streaming_job_impl(table_fragments, ctx) + .create_streaming_job_impl(stream_job_fragments, ctx) .await; match res { Ok(version) => { @@ -324,7 +324,7 @@ impl GlobalStreamManager { async fn create_streaming_job_impl( &self, - table_fragments: TableFragments, + stream_job_fragments: StreamJobFragments, CreateStreamingJobContext { streaming_job, dispatchers, @@ -342,26 +342,26 @@ impl GlobalStreamManager { let mut replace_table_id = None; tracing::debug!( - table_id = %table_fragments.table_id(), + table_id = %stream_job_fragments.stream_job_id(), "built actors finished" ); let need_pause = replace_table_job_info.is_some(); - if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { + if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info { self.metadata_manager .catalog_controller - .prepare_streaming_job(&table_fragments, &streaming_job, true) + .prepare_streaming_job(&stream_job_fragments, &streaming_job, true) .await?; - let tmp_table_id = table_fragments.table_id(); + let tmp_table_id = stream_job_fragments.stream_job_id(); let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; replace_table_id = Some(tmp_table_id); replace_table_command = Some(ReplaceTablePlan { - old_table_fragments: context.old_table_fragments, - new_table_fragments: table_fragments, + old_fragments: context.old_fragments, + new_fragments: stream_job_fragments, merge_updates: context.merge_updates, dispatchers: context.dispatchers, init_split_assignment, @@ -370,7 +370,7 @@ impl GlobalStreamManager { }); } - let table_id = table_fragments.table_id(); + let table_id = stream_job_fragments.stream_job_id(); // Here we need to consider: // - Shared source @@ -384,7 +384,7 @@ impl GlobalStreamManager { ); let info = CreateStreamingJobCommandInfo { - table_fragments, + stream_job_fragments, upstream_root_actors, dispatchers, init_split_assignment, @@ -459,9 +459,9 @@ impl GlobalStreamManager { pub async fn replace_table( &self, - table_fragments: TableFragments, + stream_job_fragments: StreamJobFragments, ReplaceTableContext { - old_table_fragments, + old_fragments, merge_updates, dispatchers, tmp_id, @@ -469,15 +469,15 @@ impl GlobalStreamManager { .. }: ReplaceTableContext, ) -> MetaResult<()> { - let tmp_table_id = table_fragments.table_id(); + let tmp_table_id = stream_job_fragments.stream_job_id(); let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; self.barrier_scheduler .run_config_change_command_with_pause( streaming_job.database_id().into(), Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments: table_fragments, + old_fragments, + new_fragments: stream_job_fragments, merge_updates, dispatchers, init_split_assignment, diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 7c7aed920ca3b..db34e5fd312bd 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -39,7 +39,7 @@ use risingwave_pb::stream_plan::{ use crate::controller::cluster::StreamingClusterInfo; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::model::TableFragments; +use crate::model::StreamJobFragments; use crate::stream::{ ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, StreamFragmentGraph, }; @@ -459,15 +459,15 @@ async fn test_graph_builder() -> MetaResult<()> { let ActorGraphBuildResult { graph, .. } = actor_graph_builder.generate_graph(&env, &job, expr_context)?; - let table_fragments = TableFragments::for_test(TableId::default(), graph); - let actors = table_fragments.actors(); - let mview_actor_ids = table_fragments.mview_actor_ids(); + let stream_job_fragments = StreamJobFragments::for_test(TableId::default(), graph); + let actors = stream_job_fragments.actors(); + let mview_actor_ids = stream_job_fragments.mview_actor_ids(); assert_eq!(actors.len(), 9); assert_eq!(mview_actor_ids, vec![1]); assert_eq!(internal_tables.len(), 3); - let fragment_upstreams: HashMap<_, _> = table_fragments + let fragment_upstreams: HashMap<_, _> = stream_job_fragments .fragments .iter() .map(|(fragment_id, fragment)| (*fragment_id, fragment.upstream_fragment_ids.clone()))