From 6c82b0a724f550f7d096993681a3123ffe1fe6d6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 Mar 2024 07:33:17 +0000 Subject: [PATCH] fix(connector): fix null pointer in connector (#15829) (#15834) Co-authored-by: StrikeW --- .../source/core/DbzCdcEngineRunner.java | 4 +- src/jni_core/src/lib.rs | 60 ++++++++++++------- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index 2b55adc44a306..2f2a9408e7e06 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -86,7 +86,9 @@ public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) message, error); String errorMsg = - (error != null ? error.getMessage() : message); + (error != null && error.getMessage() != null + ? error.getMessage() + : message); if (!Binding.sendCdcSourceErrorToChannel( channelPtr, errorMsg)) { LOG.warn( diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index ac5192700fae5..032d2f3deb542 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -1024,16 +1024,26 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceErrorTo msg: JString<'a>, ) -> jboolean { execute_and_catch(env, move |env| { - let err_msg: String = env - .get_string(&msg) - .expect("source error message should be a java string") - .into(); - - match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) { - Ok(_) => Ok(JNI_TRUE), - Err(e) => { - tracing::error!(error = ?e.as_report(), "send error"); - Ok(JNI_FALSE) + let ret = env.get_string(&msg); + match ret { + Ok(str) => { + let err_msg: String = str.into(); + match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) { + Ok(_) => Ok(JNI_TRUE), + Err(e) => { + tracing::info!(error = ?e.as_report(), "send error"); + Ok(JNI_FALSE) + } + } + } + Err(err) => { + if msg.is_null() { + tracing::warn!("source error message is null"); + Ok(JNI_TRUE) + } else { + tracing::error!(error = ?err.as_report(), "source error message should be a java string"); + Ok(JNI_FALSE) + } } } }) @@ -1129,16 +1139,26 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterEr msg: JString<'a>, ) -> jboolean { execute_and_catch(env, move |env| { - let err_msg: String = env - .get_string(&msg) - .expect("sink error message should be a java string") - .into(); - - match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) { - Ok(_) => Ok(JNI_TRUE), - Err(e) => { - tracing::info!(error = ?e.as_report(), "send error"); - Ok(JNI_FALSE) + let ret = env.get_string(&msg); + match ret { + Ok(str) => { + let err_msg: String = str.into(); + match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) { + Ok(_) => Ok(JNI_TRUE), + Err(e) => { + tracing::info!(error = ?e.as_report(), "send error"); + Ok(JNI_FALSE) + } + } + } + Err(err) => { + if msg.is_null() { + tracing::warn!("sink error message is null"); + Ok(JNI_TRUE) + } else { + tracing::error!(error = ?err.as_report(), "sink error message should be a java string"); + Ok(JNI_FALSE) + } } } })