diff --git a/Cargo.lock b/Cargo.lock index 1a52da7f8ad82..1f60a6052a4dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9581,6 +9581,7 @@ dependencies = [ "cfg-or-panic", "chrono", "expect-test", + "fs-err", "futures", "itertools 0.12.0", "jni", diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index bb2f8bda463f4..a6de81ccef47d 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -14,6 +14,7 @@ anyhow = "1" bytes = "1" cfg-or-panic = "0.2" chrono = { version = "0.4", default-features = false } +fs-err = "2" futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.12" jni = "0.21.1" diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index 99438f818620a..4af5e5d3563a3 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -14,11 +14,12 @@ use core::option::Option::Some; use std::ffi::c_void; -use std::fs; -use std::path::Path; +use std::path::PathBuf; use std::sync::OnceLock; -use anyhow::{anyhow, bail, Context}; +use anyhow::{bail, Context}; +use fs_err as fs; +use fs_err::PathExt; use jni::objects::{JObject, JString}; use jni::strings::JNIString; use jni::{InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, NativeMethod}; @@ -32,57 +33,61 @@ use crate::call_method; const DEFAULT_MEMORY_PROPORTION: f64 = 0.07; pub static JVM: JavaVmWrapper = JavaVmWrapper; -static JVM_RESULT: OnceLock> = OnceLock::new(); +static INSTANCE: OnceLock = OnceLock::new(); pub struct JavaVmWrapper; impl JavaVmWrapper { + /// Get the initialized JVM instance. If JVM is not initialized, initialize it first. + /// If JVM cannot be initialized, return an error. pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> { - match JVM_RESULT.get_or_init(Self::inner_new) { + match INSTANCE.get_or_try_init(Self::inner_new) { Ok(jvm) => Ok(jvm), Err(e) => { error!(error = %e.as_report(), "jvm not initialized properly"); - // Note: anyhow!(e) doesn't preserve source - // https://github.com/dtolnay/anyhow/issues/341 - Err(anyhow!(e.to_report_string()).context("jvm not initialized properly")) + Err(e.context("jvm not initialized properly")) } } } - fn inner_new() -> anyhow::Result { + /// Get the initialized JVM instance. If JVM is not initialized, return None. + /// + /// Generally `get_or_init` should be preferred. + fn get(&self) -> Option<&'static JavaVM> { + INSTANCE.get() + } + + fn locate_libs_path() -> anyhow::Result { let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") { - libs_path + PathBuf::from(libs_path) } else { - tracing::warn!("environment variable CONNECTOR_LIBS_PATH is not specified, so use default path `./libs` instead"); - let path = std::env::current_exe() - .context("unable to get path of current_exe")? + tracing::info!("environment variable CONNECTOR_LIBS_PATH is not specified, use default path `./libs` instead"); + std::env::current_exe() + .and_then(|p| p.fs_err_canonicalize()) // resolve symlink of the current executable + .context("unable to get path of the executable")? .parent() .expect("not root") - .join("./libs"); - path.to_str().expect("should be able to cast").into() + .join("libs") }; - tracing::info!("libs_path = {}", libs_path); - - let dir = Path::new(&libs_path); + // No need to validate the path now, as it will be further checked when calling `fs::read_dir` later. + Ok(libs_path) + } - if !dir.is_dir() { - bail!("CONNECTOR_LIBS_PATH \"{}\" is not a directory", libs_path); - } + fn inner_new() -> anyhow::Result { + let libs_path = Self::locate_libs_path().context("failed to locate connector libs")?; + tracing::info!(path = %libs_path.display(), "located connector libs"); let mut class_vec = vec![]; - if let Ok(entries) = fs::read_dir(dir) { - for entry in entries.flatten() { - let entry_path = entry.path(); - if entry_path.file_name().is_some() { - let path = std::fs::canonicalize(entry_path) - .expect("valid entry_path obtained from fs::read_dir"); - class_vec.push(path.to_str().unwrap().to_string()); - } + let entries = fs::read_dir(&libs_path).context("failed to read connector libs")?; + for entry in entries.flatten() { + let entry_path = entry.path(); + if entry_path.file_name().is_some() { + let path = fs::canonicalize(entry_path) + .expect("invalid entry_path obtained from fs::read_dir"); + class_vec.push(path.to_str().unwrap().to_string()); } - } else { - bail!("failed to read CONNECTOR_LIBS_PATH \"{}\"", libs_path); } let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") { @@ -173,8 +178,8 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E /// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize, /// return zero. pub fn load_jvm_memory_stats() -> (usize, usize) { - match JVM_RESULT.get() { - Some(Ok(jvm)) => { + match JVM.get() { + Some(jvm) => { let result: Result<(usize, usize), jni::errors::Error> = try { let mut env = jvm.attach_current_thread()?;