diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 84067b26bf571..5d52801eca572 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -17,15 +17,17 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::Context; use async_trait::async_trait; use icelake::catalog::models::{CommitTableRequest, CommitTableResponse, LoadTableResult}; use icelake::catalog::{ BaseCatalogConfig, Catalog, CatalogRef, IcebergTableIoArgs, OperatorCreator, UpdateTable, }; use icelake::{ErrorKind, Table, TableIdentifier}; -use jni::objects::{GlobalRef, JObject, JString}; +use jni::objects::{GlobalRef, JObject}; use jni::JavaVM; -use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM}; +use risingwave_jni_core::call_method; +use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; use crate::sink::{Result, SinkError}; @@ -42,172 +44,97 @@ impl Catalog for JniCatalog { } async fn load_table(self: Arc, table_name: &TableIdentifier) -> icelake::Result { - let _guard = self.jvm.attach_current_thread().map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to attach current thread to jvm", - ) - .set_source(e) - })?; + execute_with_jni_env(self.jvm, |env| { + let table_name_str = table_name.to_string(); - let mut env = self.jvm.get_env().map_err(|e| { - icelake::Error::new(ErrorKind::Unexpected, "Failed to get jni env").set_source(e) - })?; + let table_name_jstr = env.new_string(&table_name_str).unwrap(); - let table_name_str = table_name.to_string(); + let result_json = + call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)}, + &table_name_jstr) + .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?; - let table_name_jstr = env.new_string(&table_name_str).unwrap(); + let rust_json_str = jobj_to_str(env, result_json)?; - let result_json = env - .call_method( - self.java_catalog.as_obj(), - "loadTable", - "(Ljava/lang/String;)Ljava/lang/String;", - &[(&table_name_jstr).into()], - ) - .map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to call loadTable in jni catalog", - ) - .set_source(e) - .with_context("table_name", &table_name_str) - })?; + let resp: LoadTableResult = serde_json::from_str(&rust_json_str)?; - let rust_json_str = unsafe { - let jvalue = result_json.as_jni(); - let jstring = JString::from_raw(jvalue.l); - let java_str = env.get_string(&jstring).map_err(|e| { + let metadata_location = resp.metadata_location.clone().ok_or_else(|| { icelake::Error::new( - ErrorKind::Unexpected, - "Failed to get string from return value of loadTable method of jni catalog", + icelake::ErrorKind::IcebergFeatureUnsupported, + "Loading uncommitted table is not supported!", ) - .set_source(e) })?; - java_str - .to_str() - .map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to convert java string to rust utf8 string", - ) - .set_source(e) - .with_context("Java string bytes", format!("{:?}", java_str.to_bytes())) - })? - .to_string() - }; - - let resp: LoadTableResult = serde_json::from_str(&rust_json_str)?; + tracing::info!("Table metadata location of {table_name} is {metadata_location}"); - let metadata_location = resp.metadata_location.clone().ok_or_else(|| { - icelake::Error::new( - icelake::ErrorKind::IcebergFeatureUnsupported, - "Loading uncommitted table is not supported!", - ) - })?; - - tracing::info!("Table metadata location of {table_name} is {metadata_location}"); - - let table_metadata = resp.table_metadata()?; + let table_metadata = resp.table_metadata()?; - let iceberg_io_args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)? - .with_args(self.config.table_io_configs.iter()) - .build()?; - let table_op = iceberg_io_args.create()?; + let iceberg_io_args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)? + .with_args(self.config.table_io_configs.iter()) + .build()?; + let table_op = iceberg_io_args.create()?; - Ok( - Table::builder_from_catalog(table_op, self.clone(), table_metadata, table_name.clone()) - .build()?, - ) + Ok(Table::builder_from_catalog( + table_op, + self.clone(), + table_metadata, + table_name.clone(), + ) + .build()?) + }) + .map_err(|e| { + icelake::Error::new(ErrorKind::Unexpected, "Failed to load iceberg table.") + .set_source(e) + }) } async fn update_table(self: Arc, update_table: &UpdateTable) -> icelake::Result
{ - let _guard = self.jvm.attach_current_thread().map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to attach current thread to jvm", - ) - .set_source(e) - })?; - - let request_str = serde_json::to_string(&CommitTableRequest::try_from(update_table)?)?; + execute_with_jni_env(self.jvm, |env| { + let request_str = serde_json::to_string(&CommitTableRequest::try_from(update_table)?)?; - let mut env = self.jvm.get_env().map_err(|e| { - icelake::Error::new(ErrorKind::Unexpected, "Failed to get jni env").set_source(e) - })?; - - let request_jni_str = env.new_string(&request_str).map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to create jni string from request json", - ) - .set_source(e) - .with_context("request", &request_str) - })?; - - let result_json = env - .call_method( - self.java_catalog.as_obj(), - "updateTable", - "(Ljava/lang/String;)Ljava/lang/String;", - &[(&request_jni_str).into()], - ) - .map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to call updateTable in jni catalog", - ) - .set_source(e) - .with_context("request json", &request_str) + let request_jni_str = env.new_string(&request_str).with_context(|| { + format!("Failed to create jni string from request json: {request_str}.") })?; - let rust_json_str = unsafe { - let jvalue = result_json.as_jni(); - let jstring = JString::from_raw(jvalue.l); - let java_str = env.get_string(&jstring).map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to get string from return value of updateTable method of jni catalog", - ) - .set_source(e) - })?; - - java_str - .to_str() - .map_err(|e| { - icelake::Error::new( - ErrorKind::Unexpected, - "Failed to convert java string to rust utf8 string", + let result_json = + call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)}, + &request_jni_str) + .with_context(|| { + format!( + "Failed to update iceberg table: {}", + update_table.table_name() ) - .set_source(e) - .with_context("Java string bytes", format!("{:?}", java_str.to_bytes())) - })? - .to_string() - }; + })?; + + let rust_json_str = jobj_to_str(env, result_json)?; - let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?; + let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?; - tracing::info!( - "Table metadata location of {} is {}", - update_table.table_name(), - response.metadata_location - ); + tracing::info!( + "Table metadata location of {} is {}", + update_table.table_name(), + response.metadata_location + ); - let table_metadata = response.metadata()?; + let table_metadata = response.metadata()?; - let args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)? - .with_args(self.config.table_io_configs.iter()) - .build()?; - let table_op = args.create()?; + let args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)? + .with_args(self.config.table_io_configs.iter()) + .build()?; + let table_op = args.create()?; - Ok(Table::builder_from_catalog( - table_op, - self.clone(), - table_metadata, - update_table.table_name().clone(), - ) - .build()?) + Ok(Table::builder_from_catalog( + table_op, + self.clone(), + table_metadata, + update_table.table_name().clone(), + ) + .build()?) + }) + .map_err(|e| { + icelake::Error::new(ErrorKind::Unexpected, "Failed to update iceberg table.") + .set_source(e) + }) } } diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index d96b523f80766..4efb107317274 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::sync::OnceLock; use anyhow::{anyhow, bail, Context}; +use jni::objects::{JObject, JString}; use jni::strings::JNIString; use jni::{InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, NativeMethod}; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; @@ -209,3 +210,13 @@ pub fn execute_with_jni_env( f(&mut env) } + +/// A helper method to convert an java object to rust string. +pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result { + if !env.is_instance_of(&obj, "java/lang/String")? { + bail!("Input object is not a java string and can't be converted!") + } + let jstr = JString::from(obj); + let java_str = env.get_string(&jstr)?; + Ok(java_str.to_str()?.to_string()) +}