Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clarify the meaning of table in TableCatalog and TableFragments #19510

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ message Function {
message AggregateFunction {}
}

// Includes full information about a table.
//
// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
// It is not the same as a user-side table created by `CREATE TABLE`.
//
// See `TableCatalog` struct in frontend crate for more information.
message Table {
enum TableType {
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ message DropIndexResponse {
}

message ReplaceTablePlan {
// The new table catalog, with the correct table ID and a new version.
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
Expand Down
5 changes: 4 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ service HeartbeatService {
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

// Fragments of a Streaming Job
// Fragments of a Streaming Job.
// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`.
// It's not the same as a storage table correlated with a `TableCatalog`.
message TableFragments {
// The state of the fragments of this table
enum State {
Expand Down Expand Up @@ -96,6 +98,7 @@ message TableFragments {
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
// The id of the streaming job.
uint32 table_id = 1;
State state = 2;
map<uint32, Fragment> fragments = 3;
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;

/// Includes full information about a table.
/// `TableCatalog` Includes full information about a table.
///
/// Currently, it can be either:
/// - a table or a source
/// - a materialized view
/// - an index
/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
/// It is not the same as a user-side table created by `CREATE TABLE`.
///
/// Use `self.table_type()` to determine the type of the table.
/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
///
/// # Column ID & Column Index
///
Expand Down Expand Up @@ -191,6 +189,7 @@ pub enum TableType {
/// Tables created by `CREATE MATERIALIZED VIEW`.
MaterializedView,
/// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
/// An index has both a `TableCatalog` and an `IndexCatalog`.
Index,
/// Internal tables for executors.
Internal,
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ impl CheckpointControl {
.values()
{
progress.extend([(
creating_job.info.table_fragments.table_id().table_id,
creating_job
.info
.stream_job_fragments
.stream_job_id()
.table_id,
creating_job.gen_ddl_progress(),
)]);
}
Expand Down Expand Up @@ -676,7 +680,7 @@ impl DatabaseCheckpointControl {
resps,
self.creating_streaming_job_controls[&table_id]
.info
.table_fragments
.stream_job_fragments
.all_table_ids()
.map(TableId::new),
is_first_time,
Expand Down Expand Up @@ -830,7 +834,7 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
let job_id = info.stream_job_fragments.stream_job_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/barrier/checkpoint/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ impl CreatingStreamingJobControl {
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
table_id = info.stream_job_fragments.stream_job_id().table_id,
definition = info.definition,
"new creating job"
);
let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids();
let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect();

let table_id = info.table_fragments.table_id();
let table_id = info.stream_job_fragments.stream_job_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();
let actors_to_create = info.stream_job_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
fragment_infos,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl {
} else {
let progress = create_mview_tracker
.gen_ddl_progress()
.remove(&self.info.table_fragments.table_id().table_id)
.remove(&self.info.stream_job_fragments.stream_job_id().table_id)
.expect("should exist");
format!("Snapshot [{}]", progress.progress)
}
Expand All @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl {
}
};
DdlProgress {
id: self.info.table_fragments.table_id().table_id as u64,
id: self.info.stream_job_fragments.stream_job_id().table_id as u64,
statement: self.info.definition.clone(),
progress,
}
Expand Down Expand Up @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl {
command: Option<&Command>,
barrier_info: &BarrierInfo,
) -> MetaResult<()> {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
let start_consume_upstream =
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
jobs_to_merge.contains_key(&table_id)
Expand All @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl {
};
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
table_id = self.info.stream_job_fragments.stream_job_id().table_id,
prev_epoch = barrier_info.prev_epoch(),
"start consuming upstream"
);
Expand All @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl {
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
self.info.stream_job_fragments.stream_job_id(),
control_stream_manager,
&mut self.barrier_control,
&self.graph_info,
Expand All @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some(prev_barriers_to_inject) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
Expand Down
94 changes: 49 additions & 45 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -88,8 +88,8 @@ pub struct Reschedule {
/// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]).
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub old_table_fragments: TableFragments,
pub new_table_fragments: TableFragments,
pub old_fragments: StreamJobFragments,
pub new_fragments: StreamJobFragments,
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
Expand All @@ -107,7 +107,7 @@ pub struct ReplaceTablePlan {
impl ReplaceTablePlan {
fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
for fragment in self.new_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.id().into(),
InflightFragmentInfo {
Expand All @@ -117,7 +117,7 @@ impl ReplaceTablePlan {
.map(|actor| {
(
actor.actor_id,
self.new_table_fragments
self.new_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
Expand All @@ -136,7 +136,7 @@ impl ReplaceTablePlan {
.insert(fragment.fragment_id, fragment_change)
.is_none());
}
for fragment in self.old_table_fragments.fragments.values() {
for fragment in self.old_fragments.fragments.values() {
assert!(fragment_changes
.insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.is_none());
Expand All @@ -149,7 +149,7 @@ impl ReplaceTablePlan {
#[educe(Debug)]
pub struct CreateStreamingJobCommandInfo {
#[educe(Debug(ignore))]
pub table_fragments: TableFragments,
pub stream_job_fragments: StreamJobFragments,
/// Refer to the doc on [`crate::manager::MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
Expand All @@ -165,32 +165,36 @@ impl CreateStreamingJobCommandInfo {
pub(super) fn new_fragment_info(
&self,
) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
self.table_fragments.fragments.values().map(|fragment| {
(
fragment.fragment_id,
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
},
)
})
self.stream_job_fragments
.fragments
.values()
.map(|fragment| {
(
fragment.fragment_id,
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
self.stream_job_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id()
as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
},
)
})
}
}

Expand Down Expand Up @@ -309,9 +313,9 @@ impl Command {
Self::Resume(reason)
}

pub fn cancel(table_fragments: &TableFragments) -> Self {
pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
Self::DropStreamingJobs {
table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]),
table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
actors: table_fragments.actor_ids(),
unregistered_state_table_ids: table_fragments
.all_table_ids()
Expand Down Expand Up @@ -512,7 +516,7 @@ impl CommandContext {
&self.command
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
let table_fragments = &info.table_fragments;
let table_fragments = &info.stream_job_fragments;
let mut table_ids: HashSet<_> = table_fragments
.internal_table_ids()
.into_iter()
Expand Down Expand Up @@ -637,7 +641,7 @@ impl Command {
Command::CreateStreamingJob {
info:
CreateStreamingJobCommandInfo {
table_fragments,
stream_job_fragments: table_fragments,
dispatchers,
init_split_assignment: split_assignment,
..
Expand Down Expand Up @@ -668,7 +672,7 @@ impl Command {
.upstream_mv_table_ids
.iter()
.map(|table_id| SubscriptionUpstreamInfo {
subscriber_id: table_fragments.table_id().table_id,
subscriber_id: table_fragments.stream_job_id().table_id,
upstream_mv_table_id: table_id.table_id,
})
.collect()
Expand All @@ -685,8 +689,8 @@ impl Command {
}));

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
old_table_fragments,
new_table_fragments: _,
old_fragments: old_table_fragments,
new_fragments: _,
merge_updates,
dispatchers,
init_split_assignment,
Expand Down Expand Up @@ -728,7 +732,7 @@ impl Command {
}

Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
old_fragments: old_table_fragments,
merge_updates,
dispatchers,
init_split_assignment,
Expand Down Expand Up @@ -916,14 +920,14 @@ impl Command {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
replace_table.new_table_fragments.actors_to_create()
replace_table.new_fragments.actors_to_create()
}
CreateStreamingJobType::SnapshotBackfill(_) => {
// for snapshot backfill, the actors to create is measured separately
return None;
}
};
for (worker_id, new_actors) in info.table_fragments.actors_to_create() {
for (worker_id, new_actors) in info.stream_job_fragments.actors_to_create() {
map.entry(worker_id).or_default().extend(new_actors)
}
Some(map)
Expand All @@ -940,14 +944,14 @@ impl Command {
Some(map)
}
Command::ReplaceTable(replace_table) => {
Some(replace_table.new_table_fragments.actors_to_create())
Some(replace_table.new_fragments.actors_to_create())
}
_ => None,
}
}

fn generate_update_mutation_for_replace_table(
old_table_fragments: &TableFragments,
old_table_fragments: &StreamJobFragments,
merge_updates: &[MergeUpdate],
dispatchers: &HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: &SplitAssignment,
Expand Down
Loading
Loading