Skip to content

Commit

Permalink
fix(connector): better locating and error reporting for connector lib…
Browse files Browse the repository at this point in the history
…rary (#15342)

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
BugenZhao and xxchan authored Feb 29, 2024
1 parent 1fd6937 commit 914aabd
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false }
fs-err = "2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
jni = "0.21.1"
Expand Down
71 changes: 38 additions & 33 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

use core::option::Option::Some;
use std::ffi::c_void;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::OnceLock;

use anyhow::{anyhow, bail, Context};
use anyhow::{bail, Context};
use fs_err as fs;
use fs_err::PathExt;
use jni::objects::{JObject, JString};
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, NativeMethod};
Expand All @@ -32,57 +33,61 @@ use crate::call_method;
const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;

pub static JVM: JavaVmWrapper = JavaVmWrapper;
static JVM_RESULT: OnceLock<anyhow::Result<JavaVM>> = OnceLock::new();
static INSTANCE: OnceLock<JavaVM> = OnceLock::new();

pub struct JavaVmWrapper;

impl JavaVmWrapper {
/// Get the initialized JVM instance. If JVM is not initialized, initialize it first.
/// If JVM cannot be initialized, return an error.
pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> {
match JVM_RESULT.get_or_init(Self::inner_new) {
match INSTANCE.get_or_try_init(Self::inner_new) {
Ok(jvm) => Ok(jvm),
Err(e) => {
error!(error = %e.as_report(), "jvm not initialized properly");
// Note: anyhow!(e) doesn't preserve source
// https://github.com/dtolnay/anyhow/issues/341
Err(anyhow!(e.to_report_string()).context("jvm not initialized properly"))
Err(e.context("jvm not initialized properly"))
}
}
}

fn inner_new() -> anyhow::Result<JavaVM> {
/// Get the initialized JVM instance. If JVM is not initialized, return None.
///
/// Generally `get_or_init` should be preferred.
fn get(&self) -> Option<&'static JavaVM> {
INSTANCE.get()
}

fn locate_libs_path() -> anyhow::Result<PathBuf> {
let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
libs_path
PathBuf::from(libs_path)
} else {
tracing::warn!("environment variable CONNECTOR_LIBS_PATH is not specified, so use default path `./libs` instead");
let path = std::env::current_exe()
.context("unable to get path of current_exe")?
tracing::info!("environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead");
std::env::current_exe()
.and_then(|p| p.fs_err_canonicalize()) // resolve symlink of the current executable
.context("unable to get path of the executable")?
.parent()
.expect("not root")
.join("./libs");
path.to_str().expect("should be able to cast").into()
.join("libs")
};

tracing::info!("libs_path = {}", libs_path);

let dir = Path::new(&libs_path);
// No need to validate the path now, as it will be further checked when calling `fs::read_dir` later.
Ok(libs_path)
}

if !dir.is_dir() {
bail!("CONNECTOR_LIBS_PATH \"{}\" is not a directory", libs_path);
}
fn inner_new() -> anyhow::Result<JavaVM> {
let libs_path = Self::locate_libs_path().context("failed to locate connector libs")?;
tracing::info!(path = %libs_path.display(), "located connector libs");

let mut class_vec = vec![];

if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
let entry_path = entry.path();
if entry_path.file_name().is_some() {
let path = std::fs::canonicalize(entry_path)
.expect("valid entry_path obtained from fs::read_dir");
class_vec.push(path.to_str().unwrap().to_string());
}
let entries = fs::read_dir(&libs_path).context("failed to read connector libs")?;
for entry in entries.flatten() {
let entry_path = entry.path();
if entry_path.file_name().is_some() {
let path = fs::canonicalize(entry_path)
.expect("invalid entry_path obtained from fs::read_dir");
class_vec.push(path.to_str().unwrap().to_string());
}
} else {
bail!("failed to read CONNECTOR_LIBS_PATH \"{}\"", libs_path);
}

let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") {
Expand Down Expand Up @@ -173,8 +178,8 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E
/// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize,
/// return zero.
pub fn load_jvm_memory_stats() -> (usize, usize) {
match JVM_RESULT.get() {
Some(Ok(jvm)) => {
match JVM.get() {
Some(jvm) => {
let result: Result<(usize, usize), jni::errors::Error> = try {
let mut env = jvm.attach_current_thread()?;

Expand Down

0 comments on commit 914aabd

Please sign in to comment.