Skip to content

Commit

Permalink
update more places
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 22, 2024
1 parent d0cc502 commit b3a88dd
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 35 deletions.
36 changes: 18 additions & 18 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,24 @@ impl CommandContext {

Command::CreateStreamingJob { info, job_type } => {
let CreateStreamingJobCommandInfo {
stream_job_fragments: table_fragments,
stream_job_fragments,
dispatchers,
init_split_assignment,
..
} = info;
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
table_fragments.stream_job_id().table_id as _,
table_fragments.actor_ids(),
.post_collect_job_fragments(
stream_job_fragments.stream_job_id().table_id as _,
stream_job_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
new_fragments: new_table_fragments,
new_fragments,
dispatchers,
init_split_assignment,
..
Expand All @@ -190,18 +190,18 @@ impl CommandContext {
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.stream_job_id().table_id as _,
new_table_fragments.actor_ids(),
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}

// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();
let backfill_fragments = table_fragments.source_backfill_fragments()?;
let source_fragments = stream_job_fragments.stream_source_fragments();
let backfill_fragments = stream_job_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Expand All @@ -224,8 +224,8 @@ impl CommandContext {
}

Command::ReplaceTable(ReplaceTablePlan {
old_fragments: old_table_fragments,
new_fragments: new_table_fragments,
old_fragments,
new_fragments,
dispatchers,
init_split_assignment,
..
Expand All @@ -234,9 +234,9 @@ impl CommandContext {
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.stream_job_id().table_id as _,
new_table_fragments.actor_ids(),
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
Expand All @@ -245,11 +245,11 @@ impl CommandContext {
// Apply the split changes in source manager.
barrier_manager_context
.source_manager
.drop_source_fragments_vec(std::slice::from_ref(old_table_fragments))
.drop_source_fragments_vec(std::slice::from_ref(old_fragments))
.await;
let source_fragments = new_table_fragments.stream_source_fragments();
let source_fragments = new_fragments.stream_source_fragments();
// XXX: is it possible to have backfill fragments here?
let backfill_fragments = new_table_fragments.source_backfill_fragments()?;
let backfill_fragments = new_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl CatalogController {
}

#[allow(clippy::type_complexity)]
pub fn extract_fragment_and_actors_from_table_fragments(
pub fn extract_fragment_and_actors_from_fragments(
PbTableFragments {
table_id,
fragments,
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,13 @@ impl CatalogController {
// making them the source of truth and performing a full replacement for those in the meta store?
pub async fn prepare_streaming_job(
&self,
table_fragments: &StreamJobFragments,
stream_job_fragments: &StreamJobFragments,
streaming_job: &StreamingJob,
for_replace: bool,
) -> MetaResult<()> {
let fragment_actors =
Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?;
let all_tables = table_fragments.all_tables();
Self::extract_fragment_and_actors_from_fragments(stream_job_fragments.to_protobuf())?;
let all_tables = stream_job_fragments.all_tables();
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

Expand Down Expand Up @@ -601,7 +601,7 @@ impl CatalogController {
Ok((true, Some(database_id)))
}

pub async fn post_collect_table_fragments(
pub async fn post_collect_job_fragments(
&self,
job_id: ObjectId,
actor_ids: Vec<crate::model::ActorId>,
Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ impl DdlController {

// create fragment and actor catalogs.
tracing::debug!(id = streaming_job.id(), "building streaming job");
let (ctx, table_fragments) = self
let (ctx, stream_job_fragments) = self
.build_stream_job(
ctx,
streaming_job,
Expand All @@ -1057,7 +1057,7 @@ impl DdlController {

match streaming_job {
StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
Self::validate_cdc_table(table, &table_fragments).await?;
Self::validate_cdc_table(table, &stream_job_fragments).await?;
}
StreamingJob::Table(Some(source), ..) => {
// Register the source on the connector node.
Expand All @@ -1076,7 +1076,7 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(&table_fragments, streaming_job, false)
.prepare_streaming_job(&stream_job_fragments, streaming_job, false)
.await?;

// create streaming jobs.
Expand All @@ -1087,7 +1087,7 @@ impl DdlController {
// FIXME(kwannoel): Unify background stream's creation path with MV below.
| (CreateType::Background, StreamingJob::Sink(_, _)) => {
let version = self.stream_manager
.create_streaming_job(table_fragments, ctx)
.create_streaming_job(stream_job_fragments, ctx)
.await?;
Ok(version)
}
Expand All @@ -1096,7 +1096,7 @@ impl DdlController {
let fut = async move {
let _ = ctrl
.stream_manager
.create_streaming_job(table_fragments, ctx)
.create_streaming_job(stream_job_fragments, ctx)
.await.inspect_err(|err| {
tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
});
Expand Down Expand Up @@ -1198,7 +1198,7 @@ impl DdlController {
)
.await? as u32;

let (ctx, table_fragments) = self
let (ctx, stream_job_fragments) = self
.inject_replace_table_job_for_table_sink(
tmp_id,
&self.metadata_manager,
Expand All @@ -1216,11 +1216,11 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
.await?;

self.stream_manager
.replace_table(table_fragments, ctx)
.replace_table(stream_job_fragments, ctx)
.await?;

merge_updates
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,20 @@ impl GlobalStreamManager {

let need_pause = replace_table_job_info.is_some();

if let Some((streaming_job, context, table_fragments)) = replace_table_job_info {
if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
self.metadata_manager
.catalog_controller
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
.await?;

let tmp_table_id = table_fragments.stream_job_id();
let tmp_table_id = stream_job_fragments.stream_job_id();
let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?;

replace_table_id = Some(tmp_table_id);

replace_table_command = Some(ReplaceTablePlan {
old_fragments: context.old_fragments,
new_fragments: table_fragments,
new_fragments: stream_job_fragments,
merge_updates: context.merge_updates,
dispatchers: context.dispatchers,
init_split_assignment,
Expand Down

0 comments on commit b3a88dd

Please sign in to comment.