Skip to content

Commit

Permalink
Some enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jan 11, 2024
1 parent ac57429 commit 914b962
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 144 deletions.
215 changes: 71 additions & 144 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use async_trait::async_trait;
use icelake::catalog::models::{CommitTableRequest, CommitTableResponse, LoadTableResult};
use icelake::catalog::{
BaseCatalogConfig, Catalog, CatalogRef, IcebergTableIoArgs, OperatorCreator, UpdateTable,
};
use icelake::{ErrorKind, Table, TableIdentifier};
use jni::objects::{GlobalRef, JObject, JString};
use jni::objects::{GlobalRef, JObject};
use jni::JavaVM;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM};
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};

use crate::sink::{Result, SinkError};

Expand All @@ -42,172 +44,97 @@ impl Catalog for JniCatalog {
}

async fn load_table(self: Arc<Self>, table_name: &TableIdentifier) -> icelake::Result<Table> {
let _guard = self.jvm.attach_current_thread().map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to attach current thread to jvm",
)
.set_source(e)
})?;
execute_with_jni_env(self.jvm, |env| {
let table_name_str = table_name.to_string();

let mut env = self.jvm.get_env().map_err(|e| {
icelake::Error::new(ErrorKind::Unexpected, "Failed to get jni env").set_source(e)
})?;
let table_name_jstr = env.new_string(&table_name_str).unwrap();

let table_name_str = table_name.to_string();
let result_json =
call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)},
&table_name_jstr)
.with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?;

let table_name_jstr = env.new_string(&table_name_str).unwrap();
let rust_json_str = jobj_to_str(env, result_json)?;

let result_json = env
.call_method(
self.java_catalog.as_obj(),
"loadTable",
"(Ljava/lang/String;)Ljava/lang/String;",
&[(&table_name_jstr).into()],
)
.map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to call loadTable in jni catalog",
)
.set_source(e)
.with_context("table_name", &table_name_str)
})?;
let resp: LoadTableResult = serde_json::from_str(&rust_json_str)?;

let rust_json_str = unsafe {
let jvalue = result_json.as_jni();
let jstring = JString::from_raw(jvalue.l);
let java_str = env.get_string(&jstring).map_err(|e| {
let metadata_location = resp.metadata_location.clone().ok_or_else(|| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to get string from return value of loadTable method of jni catalog",
icelake::ErrorKind::IcebergFeatureUnsupported,
"Loading uncommitted table is not supported!",
)
.set_source(e)
})?;

java_str
.to_str()
.map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to convert java string to rust utf8 string",
)
.set_source(e)
.with_context("Java string bytes", format!("{:?}", java_str.to_bytes()))
})?
.to_string()
};

let resp: LoadTableResult = serde_json::from_str(&rust_json_str)?;
tracing::info!("Table metadata location of {table_name} is {metadata_location}");

let metadata_location = resp.metadata_location.clone().ok_or_else(|| {
icelake::Error::new(
icelake::ErrorKind::IcebergFeatureUnsupported,
"Loading uncommitted table is not supported!",
)
})?;

tracing::info!("Table metadata location of {table_name} is {metadata_location}");

let table_metadata = resp.table_metadata()?;
let table_metadata = resp.table_metadata()?;

let iceberg_io_args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)?
.with_args(self.config.table_io_configs.iter())
.build()?;
let table_op = iceberg_io_args.create()?;
let iceberg_io_args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)?
.with_args(self.config.table_io_configs.iter())
.build()?;
let table_op = iceberg_io_args.create()?;

Ok(
Table::builder_from_catalog(table_op, self.clone(), table_metadata, table_name.clone())
.build()?,
)
Ok(Table::builder_from_catalog(
table_op,
self.clone(),
table_metadata,
table_name.clone(),
)
.build()?)
})
.map_err(|e| {
icelake::Error::new(ErrorKind::Unexpected, "Failed to load iceberg table.")
.set_source(e)
})
}

async fn update_table(self: Arc<Self>, update_table: &UpdateTable) -> icelake::Result<Table> {
let _guard = self.jvm.attach_current_thread().map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to attach current thread to jvm",
)
.set_source(e)
})?;

let request_str = serde_json::to_string(&CommitTableRequest::try_from(update_table)?)?;
execute_with_jni_env(self.jvm, |env| {
let request_str = serde_json::to_string(&CommitTableRequest::try_from(update_table)?)?;

let mut env = self.jvm.get_env().map_err(|e| {
icelake::Error::new(ErrorKind::Unexpected, "Failed to get jni env").set_source(e)
})?;

let request_jni_str = env.new_string(&request_str).map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to create jni string from request json",
)
.set_source(e)
.with_context("request", &request_str)
})?;

let result_json = env
.call_method(
self.java_catalog.as_obj(),
"updateTable",
"(Ljava/lang/String;)Ljava/lang/String;",
&[(&request_jni_str).into()],
)
.map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to call updateTable in jni catalog",
)
.set_source(e)
.with_context("request json", &request_str)
let request_jni_str = env.new_string(&request_str).with_context(|| {
format!("Failed to create jni string from request json: {request_str}.")
})?;

let rust_json_str = unsafe {
let jvalue = result_json.as_jni();
let jstring = JString::from_raw(jvalue.l);
let java_str = env.get_string(&jstring).map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to get string from return value of updateTable method of jni catalog",
)
.set_source(e)
})?;

java_str
.to_str()
.map_err(|e| {
icelake::Error::new(
ErrorKind::Unexpected,
"Failed to convert java string to rust utf8 string",
let result_json =
call_method!(env, self.java_catalog.as_obj(), {String updateTable(String)},
&request_jni_str)
.with_context(|| {
format!(
"Failed to update iceberg table: {}",
update_table.table_name()
)
.set_source(e)
.with_context("Java string bytes", format!("{:?}", java_str.to_bytes()))
})?
.to_string()
};
})?;

let rust_json_str = jobj_to_str(env, result_json)?;

let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;
let response: CommitTableResponse = serde_json::from_str(&rust_json_str)?;

tracing::info!(
"Table metadata location of {} is {}",
update_table.table_name(),
response.metadata_location
);
tracing::info!(
"Table metadata location of {} is {}",
update_table.table_name(),
response.metadata_location
);

let table_metadata = response.metadata()?;
let table_metadata = response.metadata()?;

let args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)?
.with_args(self.config.table_io_configs.iter())
.build()?;
let table_op = args.create()?;
let args = IcebergTableIoArgs::builder_from_path(&table_metadata.location)?
.with_args(self.config.table_io_configs.iter())
.build()?;
let table_op = args.create()?;

Ok(Table::builder_from_catalog(
table_op,
self.clone(),
table_metadata,
update_table.table_name().clone(),
)
.build()?)
Ok(Table::builder_from_catalog(
table_op,
self.clone(),
table_metadata,
update_table.table_name().clone(),
)
.build()?)
})
.map_err(|e| {
icelake::Error::new(ErrorKind::Unexpected, "Failed to update iceberg table.")
.set_source(e)
})
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::path::Path;
use std::sync::OnceLock;

use anyhow::{anyhow, bail, Context};
use jni::objects::{JObject, JString};
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, NativeMethod};
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
Expand Down Expand Up @@ -209,3 +210,13 @@ pub fn execute_with_jni_env<T>(

f(&mut env)
}

/// A helper method to convert an java object to rust string.
pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<String> {
if !env.is_instance_of(&obj, "java/lang/String")? {
bail!("Input object is not a java string and can't be converted!")
}
let jstr = JString::from(obj);
let java_str = env.get_string(&jstr)?;
Ok(java_str.to_str()?.to_string())
}

0 comments on commit 914b962

Please sign in to comment.