Skip to content

Commit

Permalink
refactor: clarify the meaning of table in TableCatalog and TableFragm…
Browse files Browse the repository at this point in the history
…ents

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 21, 2024
1 parent 4aaa3eb commit c78ebdd
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 99 deletions.
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ message Function {
message AggregateFunction {}
}

// Includes full information about a table.
//
// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
// It is not the same as a user-side table created by `CREATE TABLE`.
//
// See `TableCatalog` struct in frontend crate for more information.
message Table {
enum TableType {
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ message DropIndexResponse {
}

message ReplaceTablePlan {
// The new table catalog, with the correct table ID and a new version.
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
Expand Down
5 changes: 4 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ service HeartbeatService {
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

// Fragments of a Streaming Job
// Fragments of a Streaming Job.
// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`.
// It's not the same as a storage table correlated with a `TableCatalog`.
message TableFragments {
// The state of the fragments of this table
enum State {
Expand Down Expand Up @@ -96,6 +98,7 @@ message TableFragments {
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
// The id of the streaming job.
uint32 table_id = 1;
State state = 2;
map<uint32, Fragment> fragments = 3;
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;

/// Includes full information about a table.
/// `TableCatalog` Includes full information about a table.
///
/// Currently, it can be either:
/// - a table or a source
/// - a materialized view
/// - an index
/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
/// It is not the same as a user-side table created by `CREATE TABLE`.
///
/// Use `self.table_type()` to determine the type of the table.
/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
///
/// # Column ID & Column Index
///
Expand Down Expand Up @@ -191,6 +189,7 @@ pub enum TableType {
/// Tables created by `CREATE MATERIALIZED VIEW`.
MaterializedView,
/// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
/// An index has both a `TableCatalog` and an `IndexCatalog`.
Index,
/// Internal tables for executors.
Internal,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl CheckpointControl {
.values()
{
progress.extend([(
creating_job.info.table_fragments.table_id().table_id,
creating_job.info.table_fragments.stream_job_id().table_id,
creating_job.gen_ddl_progress(),
)]);
}
Expand Down Expand Up @@ -830,7 +830,7 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
let job_id = info.table_fragments.stream_job_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/barrier/checkpoint/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl CreatingStreamingJobControl {
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
table_id = info.table_fragments.stream_job_id().table_id,
definition = info.definition,
"new creating job"
);
Expand All @@ -70,7 +70,7 @@ impl CreatingStreamingJobControl {
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect();

let table_id = info.table_fragments.table_id();
let table_id = info.table_fragments.stream_job_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl {
} else {
let progress = create_mview_tracker
.gen_ddl_progress()
.remove(&self.info.table_fragments.table_id().table_id)
.remove(&self.info.table_fragments.stream_job_id().table_id)
.expect("should exist");
format!("Snapshot [{}]", progress.progress)
}
Expand All @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl {
}
};
DdlProgress {
id: self.info.table_fragments.table_id().table_id as u64,
id: self.info.table_fragments.stream_job_id().table_id as u64,
statement: self.info.definition.clone(),
progress,
}
Expand Down Expand Up @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl {
command: Option<&Command>,
barrier_info: &BarrierInfo,
) -> MetaResult<()> {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.table_fragments.stream_job_id();
let start_consume_upstream =
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
jobs_to_merge.contains_key(&table_id)
Expand All @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl {
};
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
table_id = self.info.table_fragments.stream_job_id().table_id,
prev_epoch = barrier_info.prev_epoch(),
"start consuming upstream"
);
Expand All @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl {
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
self.info.table_fragments.stream_job_id(),
control_stream_manager,
&mut self.barrier_control,
&self.graph_info,
Expand All @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some(prev_barriers_to_inject) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.table_fragments.stream_job_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -88,8 +88,8 @@ pub struct Reschedule {
/// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]).
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub old_table_fragments: TableFragments,
pub new_table_fragments: TableFragments,
pub old_table_fragments: StreamJobFragments,
pub new_table_fragments: StreamJobFragments,
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ReplaceTablePlan {
#[educe(Debug)]
pub struct CreateStreamingJobCommandInfo {
#[educe(Debug(ignore))]
pub table_fragments: TableFragments,
pub table_fragments: StreamJobFragments,
/// Refer to the doc on [`crate::manager::MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
Expand Down Expand Up @@ -309,9 +309,9 @@ impl Command {
Self::Resume(reason)
}

pub fn cancel(table_fragments: &TableFragments) -> Self {
pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
Self::DropStreamingJobs {
table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]),
table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
actors: table_fragments.actor_ids(),
unregistered_state_table_ids: table_fragments
.all_table_ids()
Expand Down Expand Up @@ -668,7 +668,7 @@ impl Command {
.upstream_mv_table_ids
.iter()
.map(|table_id| SubscriptionUpstreamInfo {
subscriber_id: table_fragments.table_id().table_id,
subscriber_id: table_fragments.stream_job_id().table_id,
upstream_mv_table_id: table_id.table_id,
})
.collect()
Expand Down Expand Up @@ -947,7 +947,7 @@ impl Command {
}

fn generate_update_mutation_for_replace_table(
old_table_fragments: &TableFragments,
old_table_fragments: &StreamJobFragments,
merge_updates: &[MergeUpdate],
dispatchers: &HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: &SplitAssignment,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl CommandContext {
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
table_fragments.table_id().table_id as _,
table_fragments.stream_job_id().table_id as _,
table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
Expand All @@ -191,7 +191,7 @@ impl CommandContext {
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.stream_job_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
Expand Down Expand Up @@ -235,7 +235,7 @@ impl CommandContext {
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.stream_job_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, TableFragments, TableParallelism};
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
use crate::stream::{RescheduleOptions, TableResizePolicy};
use crate::{model, MetaResult};

Expand Down Expand Up @@ -66,7 +66,7 @@ impl GlobalBarrierWorkerContextImpl {

async fn recover_background_mv_progress(
&self,
) -> MetaResult<HashMap<TableId, (String, TableFragments)>> {
) -> MetaResult<HashMap<TableId, (String, StreamJobFragments)>> {
let mgr = &self.metadata_manager;
let mviews = mgr
.catalog_controller
Expand All @@ -80,7 +80,7 @@ impl GlobalBarrierWorkerContextImpl {
.catalog_controller
.get_job_fragments_by_id(mview.table_id)
.await?;
let table_fragments = TableFragments::from_protobuf(table_fragments);
let table_fragments = StreamJobFragments::from_protobuf(table_fragments);
if table_fragments.tracking_progress_actor_ids().is_empty() {
// If there's no tracking actor in the mview, we can finish the job directly.
mgr.catalog_controller
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::sync::oneshot::Sender;
use self::notifier::Notifier;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, TableFragments};
use crate::model::{ActorId, StreamJobFragments};
use crate::{MetaError, MetaResult};

mod checkpoint;
Expand Down Expand Up @@ -105,7 +105,7 @@ struct BarrierWorkerRuntimeInfoSnapshot {
subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
stream_actors: HashMap<ActorId, StreamActor>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, TableFragments)>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
hummock_version_stats: HummockVersionStats,
}

Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::model::{ActorId, BackfillUpstreamType, TableFragments};
use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
use crate::MetaResult;

type ConsumedRows = u64;
Expand Down Expand Up @@ -246,7 +246,7 @@ impl TrackingJob {

pub(crate) fn table_to_create(&self) -> TableId {
match self {
TrackingJob::New(command) => command.info.table_fragments.table_id(),
TrackingJob::New(command) => command.info.table_fragments.stream_job_id(),
TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
}
}
Expand All @@ -258,7 +258,7 @@ impl std::fmt::Debug for TrackingJob {
TrackingJob::New(command) => write!(
f,
"TrackingJob::New({:?})",
command.info.table_fragments.table_id()
command.info.table_fragments.stream_job_id()
),
TrackingJob::Recovered(recovered) => {
write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id)
Expand Down Expand Up @@ -302,7 +302,7 @@ impl CreateMviewProgressTracker {
/// 1. `CreateMviewProgress`.
/// 2. `Backfill` position.
pub fn recover(
mview_map: HashMap<TableId, (String, TableFragments)>,
mview_map: HashMap<TableId, (String, StreamJobFragments)>,
version_stats: &HummockVersionStats,
) -> Self {
let mut actor_map = HashMap::new();
Expand Down Expand Up @@ -523,7 +523,7 @@ impl CreateMviewProgressTracker {
..
} = &info;

let creating_mv_id = table_fragments.table_id();
let creating_mv_id = table_fragments.stream_job_id();

let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = {
// Keep track of how many times each upstream MV appears.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl BarrierScheduler {

if let Some(idx) = queue.queue.iter().position(|scheduled| {
if let Command::CreateStreamingJob { info, .. } = &scheduled.command
&& info.table_fragments.table_id() == table_id
&& info.table_fragments.stream_job_id() == table_id
{
true
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::controller::utils::{
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, StreamingJob};
use crate::model::{StreamContext, TableFragments, TableParallelism};
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -403,7 +403,7 @@ impl CatalogController {
// making them the source of truth and performing a full replacement for those in the meta store?
pub async fn prepare_streaming_job(
&self,
table_fragments: &TableFragments,
table_fragments: &StreamJobFragments,
streaming_job: &StreamingJob,
for_replace: bool,
) -> MetaResult<()> {
Expand Down
Loading

0 comments on commit c78ebdd

Please sign in to comment.