diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 39e45592305d5..4ef0d4df2bc1f 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -82,12 +82,20 @@ impl DdlController { .unwrap(); let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + let streaming_job_id = streaming_job.id(); + let streaming_job_name = streaming_job.name().to_string(); + let streaming_job_definition = streaming_job.definition().to_string(); + let src_id = match &streaming_job { + StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id), + _ => None, + }; + // create streaming job. match self .create_streaming_job_inner_v2( mgr, ctx, - streaming_job.clone(), + streaming_job, fragment_graph, affected_table_replace_info, ) @@ -97,9 +105,9 @@ impl DdlController { Err(err) => { tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job"); let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { - id: streaming_job.id(), - name: streaming_job.name(), - definition: streaming_job.definition(), + id: streaming_job_id, + name: streaming_job_name, + definition: streaming_job_definition, error: err.as_report().to_string(), }; self.env.event_log_manager_ref().add_event_logs(vec![ @@ -111,11 +119,10 @@ impl DdlController { .await?; if aborted { tracing::warn!(id = job_id, "aborted streaming job"); - match &streaming_job { - StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => { - self.source_manager.unregister_sources(vec![src.id]).await; - } - _ => {} + if let Some(src_id) = src_id { + self.source_manager + .unregister_sources(vec![src_id as _]) + .await; } } Err(err)