Skip to content

Commit

Permalink
remove unnecessary reference to stream job
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 7, 2024
1 parent 03c4310 commit e6f726a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
7 changes: 2 additions & 5 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ impl DdlController {
pub(crate) async fn build_stream_job(
&self,
stream_ctx: StreamContext,
mut stream_job: StreamingJob,
stream_job: StreamingJob,
fragment_graph: StreamFragmentGraph,
affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
Expand Down Expand Up @@ -1654,7 +1654,7 @@ impl DdlController {
}

// Do some type-specific work for each type of stream job.
match stream_job {
match &mut ctx.streaming_job {
StreamingJob::Table(None, ref table, TableJobType::SharedCdcSource) => {
Self::validate_cdc_table(table, &table_fragments)
.await
Expand All @@ -1672,9 +1672,6 @@ impl DdlController {
&ctx.replace_table_job_info
{
*target_table = Some((table.clone(), source.clone()));
if let StreamingJob::Sink(_, ref mut target_table) = &mut ctx.streaming_job {
*target_table = Some((table.clone(), source.clone()));
}
}

// Validate the sink on the connector node.
Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ impl DdlController {
_ => None,
};

let fragment_graph =
StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?;
streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id());
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

// create streaming job.
match self
.create_streaming_job_inner_v2(
Expand Down Expand Up @@ -133,15 +138,10 @@ impl DdlController {
&self,
mgr: &MetadataManagerV2,
ctx: StreamContext,
mut streaming_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
streaming_job: StreamingJob,
mut fragment_graph: StreamFragmentGraph,
affected_table_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let mut fragment_graph =
StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?;
streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id());
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

// create internal table catalogs and refill table id.
let internal_tables = fragment_graph.internal_tables().into_values().collect_vec();
let table_id_map = mgr
Expand Down

0 comments on commit e6f726a

Please sign in to comment.