Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 6, 2024
1 parent 6b3c2b1 commit e2a219b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 21 deletions.
17 changes: 16 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,28 @@ impl CommandContext {
.values()
.flat_map(build_actor_connector_splits)
.collect();
let subscriptions_to_add =
if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
job_type
{
snapshot_backfill_info
.upstream_mv_table_ids
.iter()
.map(|table_id| SubscriptionUpstreamInfo {
subscriber_id: table_fragments.table_id().table_id,
upstream_mv_table_id: table_id.table_id,
})
.collect()
} else {
Default::default()
};
let add = Some(Mutation::Add(AddMutation {
actor_dispatchers,
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
subscriptions_to_add: Default::default(),
subscriptions_to_add,
}));

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/barrier/creating_job_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ impl CreatingStreamingJobControl {
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
&command_ctx.info.node_map,
None,
if to_finish {
// erase the mutation on upstream except the last Finish command
command_ctx.to_mutation()
} else {
None
},
(&command_ctx.curr_epoch, &command_ctx.prev_epoch),
&command_ctx.kind,
fragment_info,
Expand Down
32 changes: 17 additions & 15 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,21 +1596,23 @@ fn collect_commit_epoch_info(
old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
}

let new_table_fragment_info =
if let Command::CreateStreamingJob { info, .. } = &command_ctx.command {
let table_fragments = &info.table_fragments;
Some(NewTableFragmentInfo {
table_id: table_fragments.table_id(),
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
})
} else {
None
};
let new_table_fragment_info = if let Command::CreateStreamingJob { info, job_type } =
&command_ctx.command
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
let table_fragments = &info.table_fragments;
Some(NewTableFragmentInfo {
table_id: table_fragments.table_id(),
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
})
} else {
None
};

let mut mv_log_store_truncate_epoch = HashMap::new();
let mut update_truncate_epoch =
Expand Down
13 changes: 9 additions & 4 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,15 @@ impl CreateMviewProgressTracker {
let command_ctx = &epoch_node.command_ctx;
let new_tracking_job_info =
if let Command::CreateStreamingJob { info, job_type } = &command_ctx.command {
if let CreateStreamingJobType::SinkIntoTable(replace_table) = job_type {
Some((info, Some(replace_table)))
} else {
Some((info, None))
match job_type {
CreateStreamingJobType::Normal => Some((info, None)),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
Some((info, Some(replace_table)))
}
CreateStreamingJobType::SnapshotBackfill(_) => {
// The progress of SnapshotBackfill won't be tracked here
None
}
}
} else {
None
Expand Down

0 comments on commit e2a219b

Please sign in to comment.