Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): rename replace_table to replace_stream_job where appropriate #19537

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::barrier::BarrierManagerRef;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{MetaSrvEnv, StreamingJob};
use crate::rpc::ddl_controller::{
DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId,
DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
};
use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
use crate::MetaError;
Expand Down Expand Up @@ -91,13 +91,13 @@ impl DdlServiceImpl {
source,
job_type,
}: ReplaceTablePlan,
) -> ReplaceTableInfo {
) -> ReplaceStreamJobInfo {
let table = table.unwrap();
let col_index_mapping = table_col_index_mapping
.as_ref()
.map(ColIndexMapping::from_protobuf);

ReplaceTableInfo {
ReplaceStreamJobInfo {
streaming_job: StreamingJob::Table(
source,
table,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ impl DatabaseCheckpointControl {
node.state.resps.extend(resps);
finished_jobs.push(TrackingJob::New(TrackingCommand {
info,
replace_table_info: None,
replace_stream_job: None,
}));
});
let task = task.get_or_insert_default();
Expand Down
24 changes: 13 additions & 11 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ pub struct Reschedule {
pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

/// Replacing an old table with a new one. All actors in the table job will be rebuilt.
/// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]).
/// Replacing an old job with a new one. All actors in the job will be rebuilt.
/// Used for `ALTER TABLE` ([`Command::ReplaceStreamJob`]) and sink into table ([`Command::CreateStreamingJob`]).
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub struct ReplaceStreamJobPlan {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can see no fields here is strongly related with Table.

Copy link
Member Author

@xxchan xxchan Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the contrary, the proto contains catalog.Table, so it's need more refactoring and cannot be simply renamed.

message ReplaceTablePlan {
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
// If no column modifications occur (such as for sinking into table), this will be None.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
}

pub old_fragments: StreamJobFragments,
pub new_fragments: StreamJobFragments,
/// Downstream jobs of the replaced job need to update their `Merge` node to
/// connect to the new fragment.
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
Expand All @@ -104,7 +106,7 @@ pub struct ReplaceTablePlan {
pub tmp_id: u32,
}

impl ReplaceTablePlan {
impl ReplaceStreamJobPlan {
fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
let mut fragment_changes = HashMap::new();
for fragment in self.new_fragments.fragments.values() {
Expand Down Expand Up @@ -206,7 +208,7 @@ pub struct SnapshotBackfillInfo {
#[derive(Debug, Clone)]
pub enum CreateStreamingJobType {
Normal,
SinkIntoTable(ReplaceTablePlan),
SinkIntoTable(ReplaceStreamJobPlan),
SnapshotBackfill(SnapshotBackfillInfo),
}

Expand Down Expand Up @@ -271,13 +273,13 @@ pub enum Command {
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
/// `ReplaceStreamJob` command generates a `Update` barrier with the given `merge_updates`. This is
/// essentially switching the downstream of the old table fragments to the new ones, and
/// dropping the old table fragments. Used for table schema change.
///
/// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
/// of the Merge executors are changed additionally.
ReplaceTable(ReplaceTablePlan),
ReplaceStreamJob(ReplaceStreamJobPlan),

/// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or
/// changed splits.
Expand Down Expand Up @@ -384,7 +386,7 @@ impl Command {
})
.collect(),
),
Command::ReplaceTable(plan) => Some(plan.fragment_changes()),
Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
Command::MergeSnapshotBackfillStreamingJobs(_) => None,
Command::SourceSplitAssignment(_) => None,
Command::Throttle(_) => None,
Expand Down Expand Up @@ -688,7 +690,7 @@ impl Command {
subscriptions_to_add,
}));

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
old_fragments,
new_fragments: _,
merge_updates,
Expand Down Expand Up @@ -731,7 +733,7 @@ impl Command {
}))
}

Command::ReplaceTable(ReplaceTablePlan {
Command::ReplaceStreamJob(ReplaceStreamJobPlan {
old_fragments,
merge_updates,
dispatchers,
Expand Down Expand Up @@ -943,7 +945,7 @@ impl Command {
}
Some(map)
}
Command::ReplaceTable(replace_table) => {
Command::ReplaceStreamJob(replace_table) => {
Some(replace_table.new_fragments.actors_to_create())
}
_ => None,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerCon
use crate::barrier::progress::TrackingJob;
use crate::barrier::{
BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
CreateStreamingJobType, RecoveryReason, ReplaceTablePlan, Scheduled,
CreateStreamingJobType, RecoveryReason, ReplaceStreamJobPlan, Scheduled,
};
use crate::hummock::CommitEpochInfo;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -180,7 +180,7 @@ impl CommandContext {
)
.await?;

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
new_fragments,
dispatchers,
init_split_assignment,
Expand Down Expand Up @@ -223,7 +223,7 @@ impl CommandContext {
.await?;
}

Command::ReplaceTable(ReplaceTablePlan {
Command::ReplaceStreamJob(ReplaceStreamJobPlan {
old_fragments,
new_fragments,
dispatchers,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ mod utils;
mod worker;

pub use self::command::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan,
Reschedule, SnapshotBackfillInfo,
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
};
pub use self::info::InflightSubscriptionInfo;
pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
Expand Down
33 changes: 18 additions & 15 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_pb::stream_service::PbBarrierCompleteResponse;

use crate::barrier::info::BarrierInfo;
use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan,
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
Expand Down Expand Up @@ -229,7 +229,7 @@ impl TrackingJob {
.catalog_controller
.finish_streaming_job(
streaming_job.id() as i32,
command.replace_table_info.clone(),
command.replace_stream_job.clone(),
)
.await?;
Ok(())
Expand Down Expand Up @@ -274,7 +274,7 @@ pub struct RecoveredTrackingJob {
/// The command tracking by the [`CreateMviewProgressTracker`].
pub(super) struct TrackingCommand {
pub info: CreateStreamingJobCommandInfo,
pub replace_table_info: Option<ReplaceTablePlan>,
pub replace_stream_job: Option<ReplaceStreamJobPlan>,
}

/// Tracking is done as follows:
Expand Down Expand Up @@ -379,7 +379,10 @@ impl CreateMviewProgressTracker {

pub(super) fn update_tracking_jobs<'a>(
&mut self,
info: Option<(&CreateStreamingJobCommandInfo, Option<&ReplaceTablePlan>)>,
info: Option<(
&CreateStreamingJobCommandInfo,
Option<&ReplaceStreamJobPlan>,
)>,
create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
version_stats: &HummockVersionStats,
) {
Expand All @@ -389,9 +392,9 @@ impl CreateMviewProgressTracker {
let finished_commands = {
let mut commands = vec![];
// Add the command to tracker.
if let Some((create_job_info, replace_table)) = info
if let Some((create_job_info, replace_stream_job)) = info
&& let Some(command) =
self.add(create_job_info, replace_table, version_stats)
self.add(create_job_info, replace_stream_job, version_stats)
{
// Those with no actors to track can be finished immediately.
commands.push(command);
Expand Down Expand Up @@ -429,8 +432,8 @@ impl CreateMviewProgressTracker {
if let Some(Command::CreateStreamingJob { info, job_type }) = command {
match job_type {
CreateStreamingJobType::Normal => Some((info, None)),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
Some((info, Some(replace_table)))
CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
Some((info, Some(replace_stream_job)))
}
CreateStreamingJobType::SnapshotBackfill(_) => {
// The progress of SnapshotBackfill won't be tracked here
Expand Down Expand Up @@ -494,24 +497,24 @@ impl CreateMviewProgressTracker {
pub fn add(
&mut self,
info: &CreateStreamingJobCommandInfo,
replace_table: Option<&ReplaceTablePlan>,
replace_stream_job: Option<&ReplaceStreamJobPlan>,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?info, "add job to track");
let (info, actors, replace_table_info) = {
let CreateStreamingJobCommandInfo {
stream_job_fragments: table_fragments,
stream_job_fragments,
..
} = info;
let actors = table_fragments.tracking_progress_actor_ids();
let actors = stream_job_fragments.tracking_progress_actor_ids();
if actors.is_empty() {
// The command can be finished immediately.
return Some(TrackingJob::New(TrackingCommand {
info: info.clone(),
replace_table_info: replace_table.cloned(),
replace_stream_job: replace_stream_job.cloned(),
}));
}
(info.clone(), actors, replace_table.cloned())
(info.clone(), actors, replace_stream_job.cloned())
};

let CreateStreamingJobCommandInfo {
Expand Down Expand Up @@ -567,7 +570,7 @@ impl CreateMviewProgressTracker {
// that the sink job has been created.
Some(TrackingJob::New(TrackingCommand {
info,
replace_table_info,
replace_stream_job: replace_table_info,
}))
} else {
let old = self.progress_map.insert(
Expand All @@ -576,7 +579,7 @@ impl CreateMviewProgressTracker {
progress,
TrackingJob::New(TrackingCommand {
info,
replace_table_info,
replace_stream_job: replace_table_info,
}),
),
);
Expand Down
Loading
Loading