Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(jni): remove jni_core's dependency on storage #17193

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ public class Binding {
}
}

static void ensureInitialized() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The native method iteratorNewHummock is moved to the HummockIterator class. We should ensure that this Binding class has been initialized before we use the HummockIterator class. This method is called in the static block of HummockIterator to ensure that the Binding class is loaded and initialized, and otherwise the shared library won't be loaded when we use HummockIterator and causes error.


public static native void tracingSlf4jEvent(
String threadName, String name, int level, String message);

public static native boolean tracingSlf4jEventEnabled(int level);

public static native int vnodeCount();

// hummock iterator method
// Return a pointer to the iterator
static native long iteratorNewHummock(byte[] readPlan);

static native long iteratorNewStreamChunk(long pointer);

static native boolean iteratorNext(long pointer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ public class HummockIterator implements AutoCloseable {
private final long pointer;
private boolean isClosed;

static {
Binding.ensureInitialized();
}

// hummock iterator method
// Return a pointer to the iterator
private static native long iteratorNewHummock(byte[] readPlan);

public HummockIterator(ReadPlan readPlan) {
this.pointer = Binding.iteratorNewHummock(readPlan.toByteArray());
this.pointer = iteratorNewHummock(readPlan.toByteArray());
this.isClosed = false;
}

Expand Down
19 changes: 19 additions & 0 deletions src/java_binding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,32 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
foyer ={ workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
jni = "0.21.1"
prost = { workspace = true }
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
] }
tracing = "0.1"

[dev-dependencies]
risingwave_expr = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::sync::Arc;

use anyhow::anyhow;
use bytes::Bytes;
use foyer::HybridCacheBuilder;
use futures::{Stream, TryFutureExt, TryStreamExt};
use futures::{TryFutureExt, TryStreamExt};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
Expand All @@ -25,6 +26,7 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_jni_core::HummockJavaBindingIterator;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::java_binding::key_range::Bound;
Expand All @@ -39,35 +41,34 @@ use risingwave_storage::hummock::{
use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics};
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
use risingwave_storage::table::KeyedRow;
use rw_futures_util::select_all;
use tokio::sync::mpsc::unbounded_channel;

type SelectAllIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Unpin;
type SingleIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>>;
type SingleIterStream = HummockJavaBindingIterator;

fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> SelectAllIterStream {
select_all(streams.into_iter().map(Box::pin))
fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> HummockJavaBindingIterator {
Box::pin(select_all(streams))
}

fn to_deserialized_stream(
iter: HummockStorageIterator,
row_serde: EitherSerde,
) -> SingleIterStream {
iter.into_stream(move |(key, value)| {
Ok(KeyedRow::new(
key.user_key.table_key.copy_into(),
row_serde.deserialize(value).map(OwnedRow::new)?,
))
})
Box::pin(
iter.into_stream(move |(key, value)| {
Ok((
Bytes::copy_from_slice(key.user_key.table_key.0),
row_serde.deserialize(value).map(OwnedRow::new)?,
))
})
.map_err(|e| anyhow!(e)),
)
}

pub struct HummockJavaBindingIterator {
stream: SelectAllIterStream,
}

impl HummockJavaBindingIterator {
pub async fn new(read_plan: ReadPlan) -> StorageResult<Self> {
pub(crate) async fn new_hummock_java_binding_iter(
read_plan: ReadPlan,
) -> StorageResult<HummockJavaBindingIterator> {
{
// Note(bugen): should we forward the implementation to the `StorageTable`?
let object_store = Arc::new(
build_remote_object_store(
Expand Down Expand Up @@ -170,11 +171,7 @@ impl HummockJavaBindingIterator {

let stream = select_all_vnode_stream(streams);

Ok(Self { stream })
}

pub async fn next(&mut self) -> StorageResult<Option<KeyedRow<Bytes>>> {
self.stream.try_next().await
Ok(stream)
}
}

Expand Down
75 changes: 71 additions & 4 deletions src/java_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,83 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(type_alias_impl_trait)]
#![feature(try_blocks)]

mod hummock_iterator;
use std::ffi::c_void;
use std::ops::Deref;

use anyhow::anyhow;
use cfg_or_panic::cfg_or_panic;
use jni::objects::JByteArray;
use jni::sys::{jint, JNI_VERSION_1_2};
use jni::JavaVM;
use risingwave_jni_core::register_native_method_for_jvm;
use jni::{JNIEnv, JavaVM};
use prost::Message;
use risingwave_common::error::AsReport;
use risingwave_jni_core::jvm_runtime::{jvm_env, register_java_binding_native_methods};
use risingwave_jni_core::{
execute_and_catch, gen_class_name, to_guarded_slice, EnvParam, JavaBindingIterator, Pointer,
JAVA_BINDING_ASYNC_RUNTIME,
};

use crate::hummock_iterator::new_hummock_java_binding_iter;

fn register_hummock_java_binding_native_methods(
env: &mut JNIEnv<'_>,
) -> Result<(), jni::errors::Error> {
let binding_class = env
.find_class(gen_class_name!(com.risingwave.java.binding.HummockIterator))
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
macro_rules! gen_native_method_array {
() => {{
risingwave_jni_core::split_extract_plain_native_methods! {{long iteratorNewHummock(byte[] readPlan);}, gen_native_method_array}
}};
({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
[
$(
risingwave_jni_core::gen_native_method_entry! {
Java_com_risingwave_java_binding_HummockIterator_, $func_name, {$($ret)+}, {$($args)*}
},
)*
]
}
}
env.register_native_methods(binding_class, &gen_native_method_array!())
.inspect_err(
|e| tracing::error!(error = ?e.as_report(), "jvm register native methods error"),
)?;

tracing::info!("register native methods for jvm successfully");
Ok(())
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "system" fn JNI_OnLoad(jvm: JavaVM, _reserved: *mut c_void) -> jint {
let _ = register_native_method_for_jvm(&jvm)
.inspect_err(|_e| eprintln!("unable to register native method"));
let result: Result<(), jni::errors::Error> = try {
let mut env = jvm_env(&jvm)?;
register_java_binding_native_methods(&mut env)?;
register_hummock_java_binding_native_methods(&mut env)?;
};
let _ =
result.inspect_err(|e| eprintln!("unable to register native method: {:?}", e.as_report()));

JNI_VERSION_1_2
}

#[cfg_or_panic(not(madsim))]
#[no_mangle]
extern "system" fn Java_com_risingwave_java_binding_HummockIterator_iteratorNewHummock<'a>(
env: EnvParam<'a>,
read_plan: JByteArray<'a>,
) -> Pointer<'static, JavaBindingIterator<'static>> {
execute_and_catch(env, move |env| {
let read_plan = Message::decode(to_guarded_slice(&read_plan, env)?.deref())?;
let iter = JAVA_BINDING_ASYNC_RUNTIME
.block_on(new_hummock_java_binding_iter(read_plan))
.map_err(|e| anyhow!(e))?;
let iter = JavaBindingIterator::new_hummock_iter(iter);
Ok(iter.into())
})
}
3 changes: 0 additions & 3 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
paste = "1"
prost = { workspace = true }
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
33 changes: 17 additions & 16 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use anyhow::{bail, Context};
use fs_err as fs;
use fs_err::PathExt;
use jni::objects::{JObject, JString};
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, NativeMethod};
use jni::{AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM};
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use thiserror_ext::AsReport;
use tracing::error;
Expand Down Expand Up @@ -122,19 +121,27 @@ impl JavaVmWrapper {

tracing::info!("initialize JVM successfully");

register_native_method_for_jvm(&jvm).context("failed to register native method")?;
let result: std::result::Result<(), jni::errors::Error> = try {
let mut env = jvm_env(&jvm)?;
register_java_binding_native_methods(&mut env)?;
};

result.context("failed to register native method")?;

Ok(jvm)
}
}

pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::Error> {
let mut env = jvm
.attach_current_thread()
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))?;
pub fn jvm_env(jvm: &JavaVM) -> Result<AttachGuard<'_>, jni::errors::Error> {
jvm.attach_current_thread()
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm attach thread error"))
}

pub fn register_java_binding_native_methods(
env: &mut JNIEnv<'_>,
) -> Result<(), jni::errors::Error> {
let binding_class = env
.find_class("com/risingwave/java/binding/Binding")
.find_class(gen_class_name!(com.risingwave.java.binding.Binding))
.inspect_err(|e| tracing::error!(error = ?e.as_report(), "jvm find class error"))?;
use crate::*;
macro_rules! gen_native_method_array {
Expand All @@ -144,14 +151,8 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E
({$({ $func_name:ident, {$($ret:tt)+}, {$($args:tt)*} })*}) => {
[
$(
{
let fn_ptr = paste::paste! {[<Java_com_risingwave_java_binding_Binding_ $func_name> ]} as *mut c_void;
let sig = $crate::gen_jni_sig! { {$($ret)+}, {$($args)*}};
NativeMethod {
name: JNIString::from(stringify! {$func_name}),
sig: JNIString::from(sig),
fn_ptr,
}
$crate::gen_native_method_entry! {
Java_com_risingwave_java_binding_Binding_, $func_name, {$($ret)+}, {$($args)*}
},
)*
]
Expand Down
Loading
Loading