Skip to content

Commit

Permalink
refactor: clarify the meaning of table in TableCatalog and TableFragm…
Browse files Browse the repository at this point in the history
…ents (#19510)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Nov 22, 2024
1 parent ca5f13a commit 09a8ff3
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 214 deletions.
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ message Function {
message AggregateFunction {}
}

// Includes full information about a table.
//
// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
// It is not the same as a user-side table created by `CREATE TABLE`.
//
// See `TableCatalog` struct in frontend crate for more information.
message Table {
enum TableType {
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ message DropIndexResponse {
}

message ReplaceTablePlan {
// The new table catalog, with the correct table ID and a new version.
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
Expand Down
5 changes: 4 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ service HeartbeatService {
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

// Fragments of a Streaming Job
// Fragments of a Streaming Job.
// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`.
// It's not the same as a storage table correlated with a `TableCatalog`.
message TableFragments {
// The state of the fragments of this table
enum State {
Expand Down Expand Up @@ -96,6 +98,7 @@ message TableFragments {
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
// The id of the streaming job.
uint32 table_id = 1;
State state = 2;
map<uint32, Fragment> fragments = 3;
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;

/// Includes full information about a table.
/// `TableCatalog` Includes full information about a table.
///
/// Currently, it can be either:
/// - a table or a source
/// - a materialized view
/// - an index
/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
/// It is not the same as a user-side table created by `CREATE TABLE`.
///
/// Use `self.table_type()` to determine the type of the table.
/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
///
/// # Column ID & Column Index
///
Expand Down Expand Up @@ -191,6 +189,7 @@ pub enum TableType {
/// Tables created by `CREATE MATERIALIZED VIEW`.
MaterializedView,
/// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
/// An index has both a `TableCatalog` and an `IndexCatalog`.
Index,
/// Internal tables for executors.
Internal,
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ impl CheckpointControl {
.values()
{
progress.extend([(
creating_job.info.table_fragments.table_id().table_id,
creating_job
.info
.stream_job_fragments
.stream_job_id()
.table_id,
creating_job.gen_ddl_progress(),
)]);
}
Expand Down Expand Up @@ -676,7 +680,7 @@ impl DatabaseCheckpointControl {
resps,
self.creating_streaming_job_controls[&table_id]
.info
.table_fragments
.stream_job_fragments
.all_table_ids()
.map(TableId::new),
is_first_time,
Expand Down Expand Up @@ -830,7 +834,7 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
let job_id = info.stream_job_fragments.stream_job_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/barrier/checkpoint/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ impl CreatingStreamingJobControl {
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
table_id = info.stream_job_fragments.stream_job_id().table_id,
definition = info.definition,
"new creating job"
);
let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids();
let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect();

let table_id = info.table_fragments.table_id();
let table_id = info.stream_job_fragments.stream_job_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();
let actors_to_create = info.stream_job_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
fragment_infos,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl {
} else {
let progress = create_mview_tracker
.gen_ddl_progress()
.remove(&self.info.table_fragments.table_id().table_id)
.remove(&self.info.stream_job_fragments.stream_job_id().table_id)
.expect("should exist");
format!("Snapshot [{}]", progress.progress)
}
Expand All @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl {
}
};
DdlProgress {
id: self.info.table_fragments.table_id().table_id as u64,
id: self.info.stream_job_fragments.stream_job_id().table_id as u64,
statement: self.info.definition.clone(),
progress,
}
Expand Down Expand Up @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl {
command: Option<&Command>,
barrier_info: &BarrierInfo,
) -> MetaResult<()> {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
let start_consume_upstream =
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
jobs_to_merge.contains_key(&table_id)
Expand All @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl {
};
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
table_id = self.info.stream_job_fragments.stream_job_id().table_id,
prev_epoch = barrier_info.prev_epoch(),
"start consuming upstream"
);
Expand All @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl {
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
self.info.stream_job_fragments.stream_job_id(),
control_stream_manager,
&mut self.barrier_control,
&self.graph_info,
Expand All @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some(prev_barriers_to_inject) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
Expand Down
Loading

0 comments on commit 09a8ff3

Please sign in to comment.