diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 0b5948646637b..0871125866d07 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -52,6 +52,7 @@ use risingwave_rpc_client::{ }; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; @@ -152,47 +153,53 @@ impl Sink for RemoteSink { } }).try_collect()?; - let mut env = JVM - .get_or_init()? - .attach_current_thread() - .map_err(|err| SinkError::Internal(err.into()))?; - let validate_sink_request = ValidateSinkRequest { - sink_param: Some(self.param.to_proto()), - }; - let validate_sink_request_bytes = env - .byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request)) - .map_err(|err| SinkError::Internal(err.into()))?; + let jvm = JVM.get_or_init()?; + let sink_param = self.param.to_proto(); + + spawn_blocking(move || { + let mut env = jvm + .attach_current_thread() + .map_err(|err| SinkError::Internal(err.into()))?; + let validate_sink_request = ValidateSinkRequest { + sink_param: Some(sink_param), + }; + let validate_sink_request_bytes = env + .byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request)) + .map_err(|err| SinkError::Internal(err.into()))?; + + let response = env + .call_static_method( + "com/risingwave/connector/JniSinkValidationHandler", + "validate", + "([B)[B", + &[JValue::Object(&validate_sink_request_bytes)], + ) + .map_err(|err| SinkError::Internal(err.into()))?; + + let validate_sink_response_bytes = match response { + JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) }, + _ => unreachable!(), + }; - let response = env - .call_static_method( - "com/risingwave/connector/JniSinkValidationHandler", - "validate", - "([B)[B", - &[JValue::Object(&validate_sink_request_bytes)], + let validate_sink_response: ValidateSinkResponse = Message::decode( + risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env) + .map_err(|err| SinkError::Internal(err.into()))? + .deref(), ) .map_err(|err| SinkError::Internal(err.into()))?; - let validate_sink_response_bytes = match response { - JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) }, - _ => unreachable!(), - }; - - let validate_sink_response: ValidateSinkResponse = Message::decode( - risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env) - .map_err(|err| SinkError::Internal(err.into()))? - .deref(), - ) - .map_err(|err| SinkError::Internal(err.into()))?; - - validate_sink_response.error.map_or_else( - || Ok(()), // If there is no error message, return Ok here. - |err| { - Err(SinkError::Remote(anyhow!(format!( - "sink cannot pass validation: {}", - err.error_message - )))) - }, - ) + validate_sink_response.error.map_or_else( + || Ok(()), // If there is no error message, return Ok here. + |err| { + Err(SinkError::Remote(anyhow!(format!( + "sink cannot pass validation: {}", + err.error_message + )))) + }, + ) + }) + .await + .map_err(|e| anyhow!("unable to validate: {:?}", e))? } }