Skip to content

Commit

Permalink
change name to get_or_init
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 31, 2023
1 parent aa7af6b commit 7042a94
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}).try_collect()?;

let mut env = JVM
.get()?
.get_or_init()?
.attach_current_thread()
.map_err(|err| SinkError::Internal(err.into()))?;
let validate_sink_request = ValidateSinkRequest {
Expand Down Expand Up @@ -613,7 +613,7 @@ struct EmbeddedConnectorClient {

impl EmbeddedConnectorClient {
fn new() -> Result<Self> {
let jvm = JVM.get().map_err(|e| anyhow!("cannot get jvm: {:?}", e))?;
let jvm = JVM.get_or_init()?;
Ok(EmbeddedConnectorClient { jvm })
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
SourceType::from(T::source_type())
);

let mut env = JVM.get()?.attach_current_thread()?;
let mut env = JVM.get_or_init()?.attach_current_thread()?;

let validate_source_request = ValidateSourceRequest {
source_id: context.info.source_id as u64,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let jvm = JVM
.get()
.get_or_init()
.map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?;

let get_event_stream_request = GetEventStreamRequest {
Expand Down
81 changes: 35 additions & 46 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::path::Path;
use std::sync::OnceLock;

use anyhow::anyhow;
use jni::objects::JValueOwned;
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod};
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
Expand All @@ -34,17 +33,7 @@ static JVM_RESULT: OnceLock<Result<JavaVM, String>> = OnceLock::new();
pub struct JavaVmWrapper;

impl JavaVmWrapper {
pub fn for_initialized_jvm<O, F: FnOnce(&'static JavaVM) -> O>(&self, f: F) -> Option<O> {
JVM_RESULT.get().and_then(|result| {
if let Ok(jvm) = result {
Some(f(jvm))
} else {
None
}
})
}

pub fn get(&self) -> anyhow::Result<&'static JavaVM> {
pub fn get_or_init(&self) -> anyhow::Result<&'static JavaVM> {
match JVM_RESULT.get_or_init(|| {
Self::inner_new().inspect_err(|e| error!("failed to init jvm: {:?}", e))
}) {
Expand Down Expand Up @@ -166,41 +155,41 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E

/// Load JVM memory statistics from the runtime. If JVM is not initialized or fail to initialize, return zero.
pub fn load_jvm_memory_stats() -> (usize, usize) {
JVM.for_initialized_jvm(|jvm| {
let result: Result<(usize, usize), jni::errors::Error> = try {
let mut env = jvm.attach_current_thread()?;

let runtime_instance = env.call_static_method(
"java/lang/Runtime",
"getRuntime",
"()Ljava/lang/Runtime;",
&[],
)?;

let runtime_instance = match runtime_instance {
JValueOwned::Object(o) => o,
_ => unreachable!(),
match JVM_RESULT.get() {
Some(Ok(jvm)) => {
let result: Result<(usize, usize), jni::errors::Error> = try {
let mut env = jvm.attach_current_thread()?;

let runtime_instance = env
.call_static_method(
"java/lang/Runtime",
"getRuntime",
"()Ljava/lang/Runtime;",
&[],
)?
.l()
.expect("should be object");

let total_memory = env
.call_method(runtime_instance.as_ref(), "totalMemory", "()J", &[])?
.j()
.expect("should be long");

let free_memory = env
.call_method(runtime_instance, "freeMemory", "()J", &[])?
.j()
.expect("should be long");

(total_memory as usize, (total_memory - free_memory) as usize)
};

let total_memory = env
.call_method(runtime_instance.as_ref(), "totalMemory", "()J", &[])?
.j()
.expect("should be long");

let free_memory = env
.call_method(runtime_instance, "freeMemory", "()J", &[])?
.j()
.expect("should be long");

(total_memory as usize, (total_memory - free_memory) as usize)
};
match result {
Ok(ret) => ret,
Err(e) => {
error!("failed to collect jvm stats: {:?}", e);
(0, 0)
match result {
Ok(ret) => ret,
Err(e) => {
error!("failed to collect jvm stats: {:?}", e);
(0, 0)
}
}
}
})
.unwrap_or((0, 0))
_ => (0, 0),
}
}

0 comments on commit 7042a94

Please sign in to comment.