Skip to content

Commit

Permalink
avoid clone
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 6, 2024
1 parent a42fea6 commit 516da40
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,20 @@ impl DdlController {
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let streaming_job_id = streaming_job.id();
let streaming_job_name = streaming_job.name().to_string();
let streaming_job_definition = streaming_job.definition().to_string();
let src_id = match &streaming_job {
StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
_ => None,
};

// create streaming job.
match self
.create_streaming_job_inner_v2(
mgr,
ctx,
streaming_job.clone(),
streaming_job,
fragment_graph,
affected_table_replace_info,
)
Expand All @@ -97,9 +105,9 @@ impl DdlController {
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id: streaming_job.id(),
name: streaming_job.name(),
definition: streaming_job.definition(),
id: streaming_job_id,
name: streaming_job_name,
definition: streaming_job_definition,
error: err.as_report().to_string(),
};
self.env.event_log_manager_ref().add_event_logs(vec![
Expand All @@ -111,11 +119,10 @@ impl DdlController {
.await?;
if aborted {
tracing::warn!(id = job_id, "aborted streaming job");
match &streaming_job {
StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => {
self.source_manager.unregister_sources(vec![src.id]).await;
}
_ => {}
if let Some(src_id) = src_id {
self.source_manager
.unregister_sources(vec![src_id as _])
.await;
}
}
Err(err)
Expand Down

0 comments on commit 516da40

Please sign in to comment.