From c78ebdd4017610b29d735250d94066e17e90f2f7 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 21 Nov 2024 16:15:42 +0800 Subject: [PATCH] refactor: clarify the meaning of table in TableCatalog and TableFragments Signed-off-by: xxchan --- proto/catalog.proto | 5 +++ proto/ddl_service.proto | 2 +- proto/meta.proto | 5 ++- src/frontend/src/catalog/table_catalog.rs | 11 +++--- src/meta/src/barrier/checkpoint/control.rs | 4 +-- .../barrier/checkpoint/creating_job/mod.rs | 16 ++++----- src/meta/src/barrier/command.rs | 16 ++++----- src/meta/src/barrier/context/context_impl.rs | 6 ++-- src/meta/src/barrier/context/recovery.rs | 6 ++-- src/meta/src/barrier/mod.rs | 4 +-- src/meta/src/barrier/progress.rs | 10 +++--- src/meta/src/barrier/schedule.rs | 2 +- src/meta/src/controller/streaming_job.rs | 4 +-- .../compaction/compaction_group_manager.rs | 14 ++++---- src/meta/src/manager/metadata.rs | 12 +++---- src/meta/src/model/stream.rs | 34 ++++++++----------- src/meta/src/rpc/ddl_controller.rs | 18 +++++----- src/meta/src/stream/source_manager.rs | 4 +-- src/meta/src/stream/stream_manager.rs | 22 ++++++------ src/meta/src/stream/test_fragmenter.rs | 4 +-- 20 files changed, 100 insertions(+), 99 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 9bb8e62fd4a11..ddfd1d1b5e18e 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -316,6 +316,11 @@ message Function { message AggregateFunction {} } +// Includes full information about a table. +// +// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed. +// It is not the same as a user-side table created by `CREATE TABLE`. +// // See `TableCatalog` struct in frontend crate for more information. message Table { enum TableType { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6467bd6e1d7e7..9c898f35ff865 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -349,7 +349,7 @@ message DropIndexResponse { } message ReplaceTablePlan { - // The new table catalog, with the correct table ID and a new version. + // The new table catalog, with the correct (old) table ID and a new version. // If the new version does not match the subsequent version in the meta service's // catalog, this request will be rejected. catalog.Table table = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 5c6d1c64274fd..37527d6a87ac0 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -36,7 +36,9 @@ service HeartbeatService { rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); } -// Fragments of a Streaming Job +// Fragments of a Streaming Job. +// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`. +// It's not the same as a storage table correlated with a `TableCatalog`. message TableFragments { // The state of the fragments of this table enum State { @@ -96,6 +98,7 @@ message TableFragments { // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } + // The id of the streaming job. uint32 table_id = 1; State state = 2; map fragments = 3; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 3c13715a63f28..6ae880ca14830 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -35,14 +35,12 @@ use crate::expr::ExprImpl; use crate::optimizer::property::Cardinality; use crate::user::UserId; -/// Includes full information about a table. +/// `TableCatalog` Includes full information about a table. /// -/// Currently, it can be either: -/// - a table or a source -/// - a materialized view -/// - an index +/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed. +/// It is not the same as a user-side table created by `CREATE TABLE`. /// -/// Use `self.table_type()` to determine the type of the table. +/// Use [`Self::table_type()`] to determine the [`TableType`] of the table. /// /// # Column ID & Column Index /// @@ -191,6 +189,7 @@ pub enum TableType { /// Tables created by `CREATE MATERIALIZED VIEW`. MaterializedView, /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`. + /// An index has both a `TableCatalog` and an `IndexCatalog`. Index, /// Internal tables for executors. Internal, diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 1489738c2f9e2..d1319bc4d5655 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -232,7 +232,7 @@ impl CheckpointControl { .values() { progress.extend([( - creating_job.info.table_fragments.table_id().table_id, + creating_job.info.table_fragments.stream_job_id().table_id, creating_job.gen_ddl_progress(), )]); } @@ -830,7 +830,7 @@ impl DatabaseCheckpointControl { .expect("checked Some") .to_mutation(None) .expect("should have some mutation in `CreateStreamingJob` command"); - let job_id = info.table_fragments.table_id(); + let job_id = info.table_fragments.stream_job_id(); control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?; self.creating_streaming_job_controls.insert( job_id, diff --git a/src/meta/src/barrier/checkpoint/creating_job/mod.rs b/src/meta/src/barrier/checkpoint/creating_job/mod.rs index db50b9d8335f4..0fa494102d165 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/mod.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/mod.rs @@ -61,7 +61,7 @@ impl CreatingStreamingJobControl { initial_mutation: Mutation, ) -> Self { info!( - table_id = info.table_fragments.table_id().table_id, + table_id = info.table_fragments.stream_job_id().table_id, definition = info.definition, "new creating job" ); @@ -70,7 +70,7 @@ impl CreatingStreamingJobControl { create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat); let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect(); - let table_id = info.table_fragments.table_id(); + let table_id = info.table_fragments.stream_job_id(); let table_id_str = format!("{}", table_id.table_id); let actors_to_create = info.table_fragments.actors_to_create(); @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl { } else { let progress = create_mview_tracker .gen_ddl_progress() - .remove(&self.info.table_fragments.table_id().table_id) + .remove(&self.info.table_fragments.stream_job_id().table_id) .expect("should exist"); format!("Snapshot [{}]", progress.progress) } @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl { } }; DdlProgress { - id: self.info.table_fragments.table_id().table_id as u64, + id: self.info.table_fragments.stream_job_id().table_id as u64, statement: self.info.definition.clone(), progress, } @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl { command: Option<&Command>, barrier_info: &BarrierInfo, ) -> MetaResult<()> { - let table_id = self.info.table_fragments.table_id(); + let table_id = self.info.table_fragments.stream_job_id(); let start_consume_upstream = if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command { jobs_to_merge.contains_key(&table_id) @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl { }; if start_consume_upstream { info!( - table_id = self.info.table_fragments.table_id().table_id, + table_id = self.info.table_fragments.stream_job_id().table_id, prev_epoch = barrier_info.prev_epoch(), "start consuming upstream" ); @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl { { Self::inject_barrier( DatabaseId::new(self.info.streaming_job.database_id()), - self.info.table_fragments.table_id(), + self.info.table_fragments.stream_job_id(), control_stream_manager, &mut self.barrier_control, &self.graph_info, @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl { let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); self.barrier_control.collect(epoch, worker_id, resp); if let Some(prev_barriers_to_inject) = prev_barriers_to_inject { - let table_id = self.info.table_fragments.table_id(); + let table_id = self.info.table_fragments.stream_job_id(); for info in prev_barriers_to_inject { Self::inject_barrier( DatabaseId::new(self.info.streaming_job.database_id()), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f19feadabed56..ea100be9de3e1 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -49,7 +49,7 @@ use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{DdlType, StreamingJob}; -use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; +use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; /// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors @@ -88,8 +88,8 @@ pub struct Reschedule { /// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]). #[derive(Debug, Clone)] pub struct ReplaceTablePlan { - pub old_table_fragments: TableFragments, - pub new_table_fragments: TableFragments, + pub old_table_fragments: StreamJobFragments, + pub new_table_fragments: StreamJobFragments, pub merge_updates: Vec, pub dispatchers: HashMap>, /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids. @@ -149,7 +149,7 @@ impl ReplaceTablePlan { #[educe(Debug)] pub struct CreateStreamingJobCommandInfo { #[educe(Debug(ignore))] - pub table_fragments: TableFragments, + pub table_fragments: StreamJobFragments, /// Refer to the doc on [`crate::manager::MetadataManager::get_upstream_root_fragments`] for the meaning of "root". pub upstream_root_actors: HashMap>, pub dispatchers: HashMap>, @@ -309,9 +309,9 @@ impl Command { Self::Resume(reason) } - pub fn cancel(table_fragments: &TableFragments) -> Self { + pub fn cancel(table_fragments: &StreamJobFragments) -> Self { Self::DropStreamingJobs { - table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]), + table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]), actors: table_fragments.actor_ids(), unregistered_state_table_ids: table_fragments .all_table_ids() @@ -668,7 +668,7 @@ impl Command { .upstream_mv_table_ids .iter() .map(|table_id| SubscriptionUpstreamInfo { - subscriber_id: table_fragments.table_id().table_id, + subscriber_id: table_fragments.stream_job_id().table_id, upstream_mv_table_id: table_id.table_id, }) .collect() @@ -947,7 +947,7 @@ impl Command { } fn generate_update_mutation_for_replace_table( - old_table_fragments: &TableFragments, + old_table_fragments: &StreamJobFragments, merge_updates: &[MergeUpdate], dispatchers: &HashMap>, init_split_assignment: &SplitAssignment, diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 2ccdee2782861..e284ca900ee98 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -173,7 +173,7 @@ impl CommandContext { .metadata_manager .catalog_controller .post_collect_table_fragments( - table_fragments.table_id().table_id as _, + table_fragments.stream_job_id().table_id as _, table_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, @@ -191,7 +191,7 @@ impl CommandContext { .metadata_manager .catalog_controller .post_collect_table_fragments( - new_table_fragments.table_id().table_id as _, + new_table_fragments.stream_job_id().table_id as _, new_table_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, @@ -235,7 +235,7 @@ impl CommandContext { .metadata_manager .catalog_controller .post_collect_table_fragments( - new_table_fragments.table_id().table_id as _, + new_table_fragments.stream_job_id().table_id as _, new_table_fragments.actor_ids(), dispatchers.clone(), init_split_assignment, diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 344d8af0a1e61..24558771e3bf4 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -33,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, TableFragments, TableParallelism}; +use crate::model::{ActorId, StreamJobFragments, TableParallelism}; use crate::stream::{RescheduleOptions, TableResizePolicy}; use crate::{model, MetaResult}; @@ -66,7 +66,7 @@ impl GlobalBarrierWorkerContextImpl { async fn recover_background_mv_progress( &self, - ) -> MetaResult> { + ) -> MetaResult> { let mgr = &self.metadata_manager; let mviews = mgr .catalog_controller @@ -80,7 +80,7 @@ impl GlobalBarrierWorkerContextImpl { .catalog_controller .get_job_fragments_by_id(mview.table_id) .await?; - let table_fragments = TableFragments::from_protobuf(table_fragments); + let table_fragments = StreamJobFragments::from_protobuf(table_fragments); if table_fragments.tracking_progress_actor_ids().is_empty() { // If there's no tracking actor in the mview, we can finish the job directly. mgr.catalog_controller diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b8680a0e520c9..4143827027693 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -26,7 +26,7 @@ use tokio::sync::oneshot::Sender; use self::notifier::Notifier; use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, TableFragments}; +use crate::model::{ActorId, StreamJobFragments}; use crate::{MetaError, MetaResult}; mod checkpoint; @@ -105,7 +105,7 @@ struct BarrierWorkerRuntimeInfoSnapshot { subscription_infos: HashMap, stream_actors: HashMap, source_splits: HashMap>, - background_jobs: HashMap, + background_jobs: HashMap, hummock_version_stats: HummockVersionStats, } diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index a40d526bc9ee2..450b8c7fb5e8f 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -30,7 +30,7 @@ use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, }; use crate::manager::{DdlType, MetadataManager}; -use crate::model::{ActorId, BackfillUpstreamType, TableFragments}; +use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments}; use crate::MetaResult; type ConsumedRows = u64; @@ -246,7 +246,7 @@ impl TrackingJob { pub(crate) fn table_to_create(&self) -> TableId { match self { - TrackingJob::New(command) => command.info.table_fragments.table_id(), + TrackingJob::New(command) => command.info.table_fragments.stream_job_id(), TrackingJob::Recovered(recovered) => (recovered.id as u32).into(), } } @@ -258,7 +258,7 @@ impl std::fmt::Debug for TrackingJob { TrackingJob::New(command) => write!( f, "TrackingJob::New({:?})", - command.info.table_fragments.table_id() + command.info.table_fragments.stream_job_id() ), TrackingJob::Recovered(recovered) => { write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id) @@ -302,7 +302,7 @@ impl CreateMviewProgressTracker { /// 1. `CreateMviewProgress`. /// 2. `Backfill` position. pub fn recover( - mview_map: HashMap, + mview_map: HashMap, version_stats: &HummockVersionStats, ) -> Self { let mut actor_map = HashMap::new(); @@ -523,7 +523,7 @@ impl CreateMviewProgressTracker { .. } = &info; - let creating_mv_id = table_fragments.table_id(); + let creating_mv_id = table_fragments.stream_job_id(); let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = { // Keep track of how many times each upstream MV appears. diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 2b3b78ede2f70..fc88c574c005e 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -198,7 +198,7 @@ impl BarrierScheduler { if let Some(idx) = queue.queue.iter().position(|scheduled| { if let Command::CreateStreamingJob { info, .. } = &scheduled.command - && info.table_fragments.table_id() == table_id + && info.table_fragments.stream_job_id() == table_id { true } else { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index d5ee31efae246..4efe618b4a5ec 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -72,7 +72,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; -use crate::model::{StreamContext, TableFragments, TableParallelism}; +use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -403,7 +403,7 @@ impl CatalogController { // making them the source of truth and performing a full replacement for those in the meta store? pub async fn prepare_streaming_job( &self, - table_fragments: &TableFragments, + table_fragments: &StreamJobFragments, streaming_job: &StreamingJob, for_replace: bool, ) -> MetaResult<()> { diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 02b63ab47de62..18bb8dfaf86b3 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -138,7 +138,7 @@ impl HummockManager { /// Unregisters `table_fragments` from compaction groups pub async fn unregister_table_fragments_vec( &self, - table_fragments: &[crate::model::TableFragments], + table_fragments: &[crate::model::StreamJobFragments], ) { self.unregister_table_ids( table_fragments @@ -683,7 +683,7 @@ mod tests { use crate::hummock::error::Result; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::test_utils::setup_compute_env; - use crate::model::TableFragments; + use crate::model::StreamJobFragments; #[tokio::test] async fn test_inner() { @@ -752,7 +752,7 @@ mod tests { #[tokio::test] async fn test_manager() { let (_, compaction_group_manager, ..) = setup_compute_env(8080).await; - let table_fragment_1 = TableFragments::for_test( + let table_fragment_1 = StreamJobFragments::for_test( TableId::new(10), BTreeMap::from([( 1, @@ -763,7 +763,7 @@ mod tests { }, )]), ); - let table_fragment_2 = TableFragments::for_test( + let table_fragment_2 = StreamJobFragments::for_test( TableId::new(20), BTreeMap::from([( 2, @@ -790,7 +790,7 @@ mod tests { compaction_group_manager .register_table_fragments( - Some(table_fragment_1.table_id().table_id), + Some(table_fragment_1.stream_job_id().table_id), table_fragment_1.internal_table_ids(), ) .await @@ -798,7 +798,7 @@ mod tests { assert_eq!(registered_number().await, 4); compaction_group_manager .register_table_fragments( - Some(table_fragment_2.table_id().table_id), + Some(table_fragment_2.stream_job_id().table_id), table_fragment_2.internal_table_ids(), ) .await @@ -827,7 +827,7 @@ mod tests { compaction_group_manager .register_table_fragments( - Some(table_fragment_1.table_id().table_id), + Some(table_fragment_1.stream_job_id().table_id), table_fragment_1.internal_table_ids(), ) .await diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index b974ad82b0536..769ce188c0bb7 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -39,7 +39,7 @@ use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, Wor use crate::controller::fragment::FragmentParallelismInfo; use crate::manager::{LocalNotification, NotificationVersion}; use crate::model::{ - ActorId, ClusterId, FragmentId, SubscriptionId, TableFragments, TableParallelism, + ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId, TableParallelism, }; use crate::stream::SplitAssignment; use crate::telemetry::MetaTelemetryJobDesc; @@ -551,12 +551,12 @@ impl MetadataManager { }) } - pub async fn get_job_fragments_by_id(&self, id: &TableId) -> MetaResult { + pub async fn get_job_fragments_by_id(&self, id: &TableId) -> MetaResult { let pb_table_fragments = self .catalog_controller .get_job_fragments_by_id(id.table_id as _) .await?; - Ok(TableFragments::from_protobuf(pb_table_fragments)) + Ok(StreamJobFragments::from_protobuf(pb_table_fragments)) } pub async fn get_running_actors_of_fragment( @@ -596,14 +596,14 @@ impl MetadataManager { pub async fn get_job_fragments_by_ids( &self, ids: &[TableId], - ) -> MetaResult> { + ) -> MetaResult> { let mut table_fragments = vec![]; for id in ids { let pb_table_fragments = self .catalog_controller .get_job_fragments_by_id(id.table_id as _) .await?; - table_fragments.push(TableFragments::from_protobuf(pb_table_fragments)); + table_fragments.push(StreamJobFragments::from_protobuf(pb_table_fragments)); } Ok(table_fragments) } @@ -612,7 +612,7 @@ impl MetadataManager { let table_fragments = self.catalog_controller.table_fragments().await?; let mut actor_maps = HashMap::new(); for (_, fragments) in table_fragments { - let tf = TableFragments::from_protobuf(fragments); + let tf = StreamJobFragments::from_protobuf(fragments); for actor in tf.active_actors() { actor_maps .try_insert(actor.actor_id, actor) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index a93a4c4d02272..71d6d681274cc 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -85,14 +85,14 @@ impl From for PbTableParallelism { } } -/// Fragments of a streaming job. +/// Fragments of a streaming job. Corresponds to [`PbTableFragments`]. /// /// We store whole fragments in a single column family as follow: /// `table_id` => `TableFragments`. #[derive(Debug, Clone)] -pub struct TableFragments { +pub struct StreamJobFragments { /// The table id. - table_id: TableId, + stream_job_id: TableId, /// The state of the table fragments. state: State, @@ -157,10 +157,10 @@ impl StreamContext { } } -impl TableFragments { +impl StreamJobFragments { pub fn to_protobuf(&self) -> PbTableFragments { PbTableFragments { - table_id: self.table_id.table_id(), + table_id: self.stream_job_id.table_id(), state: self.state as _, fragments: self.fragments.clone().into_iter().collect(), actor_status: self.actor_status.clone().into_iter().collect(), @@ -183,7 +183,7 @@ impl TableFragments { let state = prost.state(); Self { - table_id: TableId::new(prost.table_id), + stream_job_id: TableId::new(prost.table_id), state, fragments: prost.fragments.into_iter().collect(), actor_status: prost.actor_status.into_iter().collect(), @@ -197,7 +197,7 @@ impl TableFragments { } } -impl TableFragments { +impl StreamJobFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { Self::new( @@ -213,7 +213,7 @@ impl TableFragments { /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to /// `Inactive` on the given workers. pub fn new( - table_id: TableId, + stream_job_id: TableId, fragments: BTreeMap, actor_locations: &BTreeMap, ctx: StreamContext, @@ -234,7 +234,7 @@ impl TableFragments { .collect(); Self { - table_id, + stream_job_id, state: State::Initial, fragments, actor_status, @@ -254,8 +254,8 @@ impl TableFragments { } /// Returns the table id. - pub fn table_id(&self) -> TableId { - self.table_id + pub fn stream_job_id(&self) -> TableId { + self.stream_job_id } /// Returns the state of the table fragments. @@ -278,12 +278,6 @@ impl TableFragments { self.state == State::Initial } - /// Set the table ID. - // TODO: remove this workaround for replacing table. - pub fn set_table_id(&mut self, table_id: TableId) { - self.table_id = table_id; - } - /// Set the state of the table fragments. pub fn set_state(&mut self, state: State) { self.state = state; @@ -536,9 +530,9 @@ impl TableFragments { .fragments .values() .flat_map(|f| f.state_table_ids.iter()) - .any(|table_id| *table_id == self.table_id.table_id) + .any(|table_id| *table_id == self.stream_job_id.table_id) { - Some(self.table_id.table_id) + Some(self.stream_job_id.table_id) } else { None } @@ -580,7 +574,7 @@ impl TableFragments { self.fragments .values() .flat_map(|f| f.state_table_ids.clone()) - .filter(|&t| t != self.table_id.table_id) + .filter(|&t| t != self.stream_job_id.table_id) .collect_vec() } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 5b6b7033719c4..97404490fbc18 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -71,7 +71,7 @@ use crate::manager::{ DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; +use crate::model::{FragmentId, StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -612,7 +612,7 @@ impl DdlController { /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node pub(crate) async fn validate_cdc_table( table: &Table, - table_fragments: &TableFragments, + table_fragments: &StreamJobFragments, ) -> MetaResult<()> { let stream_scan_fragment = table_fragments .fragments @@ -663,11 +663,11 @@ impl DdlController { mgr: &MetadataManager, stream_ctx: StreamContext, sink: Option<&Sink>, - creating_sink_table_fragments: Option<&TableFragments>, + creating_sink_table_fragments: Option<&StreamJobFragments>, dropping_sink_id: Option, streaming_job: &StreamingJob, fragment_graph: StreamFragmentGraph, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { + ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { let (mut replace_table_ctx, mut table_fragments) = self .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) .await?; @@ -771,7 +771,7 @@ impl DdlController { sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceTableContext, - table_fragments: &mut TableFragments, + table_fragments: &mut StreamJobFragments, target_fragment_id: FragmentId, unique_identity: Option<&str>, ) { @@ -1541,7 +1541,7 @@ impl DdlController { mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraph, affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>, - ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { + ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragments)> { let id = stream_job.id(); let specified_parallelism = fragment_graph.specified_parallelism(); let expr_context = stream_ctx.to_expr_context(); @@ -1618,7 +1618,7 @@ impl DdlController { _ => TableParallelism::Fixed(parallelism.get()), }; - let table_fragments = TableFragments::new( + let table_fragments = StreamJobFragments::new( id.into(), graph, &building_locations.actor_locations, @@ -1713,7 +1713,7 @@ impl DdlController { mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, tmp_table_id: TableId, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { + ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { let id = stream_job.id(); let expr_context = stream_ctx.to_expr_context(); @@ -1821,7 +1821,7 @@ impl DdlController { // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. - let table_fragments = TableFragments::new( + let table_fragments = StreamJobFragments::new( (tmp_table_id as u32).into(), graph, &building_locations.actor_locations, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index be962a63ff80f..95912c0d5d906 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -42,7 +42,7 @@ use tokio::{select, time}; use crate::barrier::{BarrierScheduler, Command}; use crate::manager::MetadataManager; -use crate::model::{ActorId, FragmentId, TableFragments}; +use crate::model::{ActorId, FragmentId, StreamJobFragments}; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -763,7 +763,7 @@ impl SourceManager { } /// For dropping MV. - pub async fn drop_source_fragments_vec(&self, table_fragments: &[TableFragments]) { + pub async fn drop_source_fragments_vec(&self, table_fragments: &[StreamJobFragments]) { let mut core = self.core.lock().await; // Extract the fragments that include source operators. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2d7ab47a7c784..82e41ffedb419 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -35,7 +35,7 @@ use crate::barrier::{ }; use crate::error::bail_invalid_parameter; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; -use crate::model::{ActorId, FragmentId, TableFragments, TableParallelism}; +use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -77,7 +77,7 @@ pub struct CreateStreamingJobContext { pub ddl_type: DdlType, /// Context provided for potential replace table, typically used when sinking into a table. - pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, + pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, StreamJobFragments)>, pub snapshot_backfill_info: Option, @@ -169,7 +169,7 @@ type CreatingStreamingJobInfoRef = Arc; /// Note: for better readability, keep this struct complete and immutable once created. pub struct ReplaceTableContext { /// The old table fragments to be replaced. - pub old_table_fragments: TableFragments, + pub old_table_fragments: StreamJobFragments, /// The updates to be applied to the downstream chain actors. Used for schema change. pub merge_updates: Vec, @@ -234,10 +234,10 @@ impl GlobalStreamManager { /// 4. Store related meta data. pub async fn create_streaming_job( self: &Arc, - table_fragments: TableFragments, + table_fragments: StreamJobFragments, ctx: CreateStreamingJobContext, ) -> MetaResult { - let table_id = table_fragments.table_id(); + let table_id = table_fragments.stream_job_id(); let database_id = ctx.streaming_job.database_id().into(); let (sender, mut receiver) = tokio::sync::mpsc::channel(10); let execution = StreamingJobExecution::new(table_id, sender.clone()); @@ -324,7 +324,7 @@ impl GlobalStreamManager { async fn create_streaming_job_impl( &self, - table_fragments: TableFragments, + table_fragments: StreamJobFragments, CreateStreamingJobContext { streaming_job, dispatchers, @@ -342,7 +342,7 @@ impl GlobalStreamManager { let mut replace_table_id = None; tracing::debug!( - table_id = %table_fragments.table_id(), + table_id = %table_fragments.stream_job_id(), "built actors finished" ); @@ -354,7 +354,7 @@ impl GlobalStreamManager { .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; - let tmp_table_id = table_fragments.table_id(); + let tmp_table_id = table_fragments.stream_job_id(); let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; replace_table_id = Some(tmp_table_id); @@ -370,7 +370,7 @@ impl GlobalStreamManager { }); } - let table_id = table_fragments.table_id(); + let table_id = table_fragments.stream_job_id(); // Here we need to consider: // - Shared source @@ -459,7 +459,7 @@ impl GlobalStreamManager { pub async fn replace_table( &self, - table_fragments: TableFragments, + table_fragments: StreamJobFragments, ReplaceTableContext { old_table_fragments, merge_updates, @@ -469,7 +469,7 @@ impl GlobalStreamManager { .. }: ReplaceTableContext, ) -> MetaResult<()> { - let tmp_table_id = table_fragments.table_id(); + let tmp_table_id = table_fragments.stream_job_id(); let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; self.barrier_scheduler diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 7c7aed920ca3b..814ea3c435273 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -39,7 +39,7 @@ use risingwave_pb::stream_plan::{ use crate::controller::cluster::StreamingClusterInfo; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::model::TableFragments; +use crate::model::StreamJobFragments; use crate::stream::{ ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, StreamFragmentGraph, }; @@ -459,7 +459,7 @@ async fn test_graph_builder() -> MetaResult<()> { let ActorGraphBuildResult { graph, .. } = actor_graph_builder.generate_graph(&env, &job, expr_context)?; - let table_fragments = TableFragments::for_test(TableId::default(), graph); + let table_fragments = StreamJobFragments::for_test(TableId::default(), graph); let actors = table_fragments.actors(); let mview_actor_ids = table_fragments.mview_actor_ids();