diff --git a/Cargo.lock b/Cargo.lock index 6cb0662b3215..da9a6ca39f81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3812,16 +3812,6 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" -[[package]] -name = "java-locator" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" -dependencies = [ - "glob", - "lazy_static", -] - [[package]] name = "jni" version = "0.21.1" @@ -3831,9 +3821,7 @@ dependencies = [ "cesu8", "cfg-if", "combine", - "java-locator", "jni-sys", - "libloading", "log", "thiserror", "walkdir", @@ -6874,7 +6862,6 @@ dependencies = [ "hyper-tls", "icelake", "itertools 0.11.0", - "jni", "jsonschema-transpiler", "madsim-rdkafka", "madsim-tokio", @@ -6898,7 +6885,6 @@ dependencies = [ "rand", "reqwest", "risingwave_common", - "risingwave_jni_core", "risingwave_pb", "risingwave_rpc_client", "rust_decimal", @@ -10079,7 +10065,6 @@ dependencies = [ "hyper", "indexmap 1.9.3", "itertools 0.10.5", - "jni", "lexical-core", "lexical-parse-float", "lexical-parse-integer", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 246e92c39177..3dc25892a461 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -7,7 +7,6 @@ source ci/scripts/common.sh # prepare environment export CONNECTOR_RPC_ENDPOINT="localhost:50051" -export CONNECTOR_LIBS_PATH="./connector-node/libs" while getopts 'p:' opt; do case ${opt} in diff --git a/docker/Dockerfile b/docker/Dockerfile index d788b4f435a0..03a1e16ef137 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -67,8 +67,6 @@ COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof ENV PLAYGROUND_PROFILE docker-playground # Set default dashboard UI to local path instead of github proxy ENV RW_DASHBOARD_UI_PATH /risingwave/ui -# Set default connector libs path -ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs ENTRYPOINT [ "/risingwave/bin/risingwave" ] CMD [ "playground" ] diff --git a/docker/Dockerfile.hdfs b/docker/Dockerfile.hdfs index 23b6cf802fc4..d3cd7b34dd29 100644 --- a/docker/Dockerfile.hdfs +++ b/docker/Dockerfile.hdfs @@ -91,8 +91,6 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH} ENV PLAYGROUND_PROFILE docker-playground # Set default dashboard UI to local path instead of github proxy ENV RW_DASHBOARD_UI_PATH /risingwave/ui -# Set default connector libs path -ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs ENTRYPOINT [ "/risingwave/hdfs_env.sh" ] CMD [ "playground" ] diff --git a/java/com_risingwave_java_binding_Binding.h b/java/com_risingwave_java_binding_Binding.h index a3e9aa95ec84..c2c027ed22b5 100644 --- a/java/com_risingwave_java_binding_Binding.h +++ b/java/com_risingwave_java_binding_Binding.h @@ -223,14 +223,6 @@ JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_streamChunkItera JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_streamChunkIteratorFromPretty (JNIEnv *, jclass, jstring); -/* - * Class: com_risingwave_java_binding_Binding - * Method: sendCdcSourceMsgToChannel - * Signature: (J[B)Z - */ -JNIEXPORT jboolean JNICALL Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel - (JNIEnv *, jclass, jlong, jbyteArray); - #ifdef __cplusplus } #endif diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index ba9511b02303..e9fef6e869c0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -70,33 +70,6 @@ public static CdcEngineRunner newCdcEngineRunner( return runner; } - public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) { - DbzCdcEngineRunner runner = null; - try { - var sourceId = config.getSourceId(); - var engine = - new DbzCdcEngine( - config.getSourceId(), - config.getResolvedDebeziumProps(), - (success, message, error) -> { - if (!success) { - LOG.error( - "engine#{} terminated with error. message: {}", - sourceId, - message, - error); - } else { - LOG.info("engine#{} stopped normally. {}", sourceId, message); - } - }); - - runner = new DbzCdcEngineRunner(engine); - } catch (Exception e) { - LOG.error("failed to create the CDC engine", e); - } - return runner; - } - /** Start to run the cdc engine */ public void start() { if (isRunning()) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java deleted file mode 100644 index b590fdf3da8b..000000000000 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.source.core; - -import com.risingwave.connector.api.source.SourceTypeE; -import com.risingwave.connector.source.common.DbzConnectorConfig; -import com.risingwave.java.binding.Binding; -import com.risingwave.metrics.ConnectorNodeMetrics; -import com.risingwave.proto.ConnectorServiceProto; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** handler for starting a debezium source connectors for jni */ -public class JniDbzSourceHandler { - static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); - - private final DbzConnectorConfig config; - - public JniDbzSourceHandler(DbzConnectorConfig config) { - this.config = config; - } - - public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) - throws com.google.protobuf.InvalidProtocolBufferException { - var request = - ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes); - - // For jni.rs - java.lang.Thread.currentThread() - .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader()); - // userProps extracted from request, underlying implementation is UnmodifiableMap - Map mutableUserProps = new HashMap<>(request.getPropertiesMap()); - mutableUserProps.put("source.id", Long.toString(request.getSourceId())); - var config = - new DbzConnectorConfig( - SourceTypeE.valueOf(request.getSourceType()), - request.getSourceId(), - request.getStartOffset(), - mutableUserProps, - request.getSnapshotDone()); - JniDbzSourceHandler handler = new JniDbzSourceHandler(config); - handler.start(channelPtr); - } - - public void start(long channelPtr) { - var runner = DbzCdcEngineRunner.newCdcEngineRunner(config); - if (runner == null) { - return; - } - - try { - // Start the engine - runner.start(); - LOG.info("Start consuming events of table {}", config.getSourceId()); - - while (runner.isRunning()) { - // check whether the send queue has room for new messages - // Thread will block on the channel to get output from engine - var resp = runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS); - boolean success; - if (resp != null) { - ConnectorNodeMetrics.incSourceRowsReceived( - config.getSourceType().toString(), - String.valueOf(config.getSourceId()), - resp.getEventsCount()); - LOG.info( - "Engine#{}: emit one chunk {} events to network ", - config.getSourceId(), - resp.getEventsCount()); - success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray()); - } else { - // If resp is null means just check whether channel is closed. - success = Binding.sendCdcSourceMsgToChannel(channelPtr, null); - } - if (!success) { - LOG.info( - "Engine#{}: JNI sender broken detected, stop the engine", - config.getSourceId()); - runner.stop(); - return; - } - } - } catch (Throwable t) { - LOG.error("Cdc engine failed.", t); - try { - runner.stop(); - } catch (Exception e) { - LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e); - } - } - } -} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index 4a79033b147a..3f05768ec74b 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -17,13 +17,8 @@ import io.questdb.jar.jni.JarJniLoader; public class Binding { - private static final boolean IS_EMBEDDED_CONNECTOR = - Boolean.parseBoolean(System.getProperty("is_embedded_connector")); - static { - if (!IS_EMBEDDED_CONNECTOR) { - JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding"); - } + JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding"); } public static native int vnodeCount(); @@ -89,6 +84,4 @@ public class Binding { static native void streamChunkIteratorClose(long pointer); static native long streamChunkIteratorFromPretty(String str); - - public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg); } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 6b0aebf4b0f1..67bc2f35a800 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -55,7 +55,6 @@ hyper = "0.14" hyper-tls = "0.5" icelake = { workspace = true } itertools = "0.11" -jni = { version = "0.21.1", features = ["invocation"] } jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.11", features = ["future"] } @@ -90,7 +89,6 @@ rdkafka = { workspace = true, features = [ ] } reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } -risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rust_decimal = "1" diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index f85367d32e5b..d6b82a9d9439 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -13,19 +13,13 @@ // limitations under the License. use std::str::FromStr; -use std::sync::LazyLock; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use futures::pin_mut; use futures_async_stream::try_stream; -use itertools::Itertools; -use jni::objects::JValue; -use prost::Message; use risingwave_common::util::addr::HostAddr; -use risingwave_jni_core::jvm_runtime::JVM; -use risingwave_jni_core::GetEventStreamJniSender; -use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse}; -use tokio::sync::mpsc; +use risingwave_pb::connector_service::GetEventStreamResponse; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; @@ -49,8 +43,6 @@ pub struct CdcSplitReader { source_ctx: SourceContextRef, } -const DEFAULT_CHANNEL_SIZE: usize = 16; - #[async_trait] impl SplitReader for CdcSplitReader { type Properties = CdcProperties; @@ -101,6 +93,10 @@ impl SplitReader for CdcSplitReader { impl CommonSplitReader for CdcSplitReader { #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { + let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| { + anyhow!("connector node endpoint not specified or unable to connect to connector node") + })?; + // rewrite the hostname and port for the split let mut properties = self.conn_props.props.clone(); @@ -119,54 +115,38 @@ impl CommonSplitReader for CdcSplitReader { properties.insert("table.name".into(), table_name); } - let (tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - - LazyLock::force(&JVM).as_ref()?; - - let get_event_stream_request = GetEventStreamRequest { - source_id: self.source_id, - source_type: self.conn_props.get_source_type_pb() as _, - start_offset: self.start_offset.unwrap_or_default(), - properties, - snapshot_done: self.snapshot_done, - }; - - std::thread::spawn(move || { - let mut env = JVM - .as_ref() - .unwrap() - .attach_current_thread_as_daemon() - .unwrap(); - - let get_event_stream_request_bytes = env - .byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request)) - .unwrap(); - let result = env.call_static_method( - "com/risingwave/connector/source/core/JniDbzSourceHandler", - "runJniDbzSourceThread", - "([BJ)V", - &[ - JValue::Object(&get_event_stream_request_bytes), - JValue::from(&tx as *const GetEventStreamJniSender as i64), - ], - ); - - match result { - Ok(_) => { - tracing::info!("end of jni call runJniDbzSourceThread"); + let cdc_stream = cdc_client + .start_source_stream( + self.source_id, + self.conn_props.get_source_type_pb(), + self.start_offset, + properties, + self.snapshot_done, + ) + .await + .inspect_err(|err| tracing::error!("connector node start stream error: {}", err))?; + pin_mut!(cdc_stream); + #[for_await] + for event_res in cdc_stream { + match event_res { + Ok(GetEventStreamResponse { events, .. }) => { + if events.is_empty() { + continue; + } + let mut msgs = Vec::with_capacity(events.len()); + for event in events { + msgs.push(SourceMessage::from(event)); + } + yield msgs; } Err(e) => { - tracing::error!("jni call error: {:?}", e); + return Err(anyhow!( + "Cdc service error: code {}, msg {}", + e.code(), + e.message() + )) } } - }); - - while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { - tracing::debug!("receive events {:?}", events.len()); - let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); - yield msgs; } - - Err(anyhow!("all senders are dropped"))?; } } diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index aa7e564ed1ac..12a3c59fc829 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -12,4 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(error_generic_member_access)] +#![feature(lazy_cell)] +#![feature(once_cell_try)] +#![feature(type_alias_impl_trait)] + pub use risingwave_jni_core::*; diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs deleted file mode 100644 index 5559abd3ffa3..000000000000 --- a/src/jni_core/src/jvm_runtime.rs +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::option::Option::Some; -use std::ffi::c_void; -use std::fs; -use std::path::Path; -use std::sync::LazyLock; - -use jni::strings::JNIString; -use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod}; -use risingwave_common::error::{ErrorCode, RwError}; -use risingwave_common::util::resource_util::memory::total_memory_available_bytes; - -pub static JVM: LazyLock> = LazyLock::new(|| { - let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") { - libs_path - } else { - return Err(ErrorCode::InternalError( - "environment variable CONNECTOR_LIBS_PATH is not specified".to_string(), - ) - .into()); - }; - - let dir = Path::new(&libs_path); - - if !dir.is_dir() { - return Err(ErrorCode::InternalError(format!( - "CONNECTOR_LIBS_PATH \"{}\" is not a directory", - libs_path - )) - .into()); - } - - let mut class_vec = vec![]; - - if let Ok(entries) = fs::read_dir(dir) { - for entry in entries.flatten() { - let entry_path = entry.path(); - if entry_path.file_name().is_some() { - let path = std::fs::canonicalize(entry_path)?; - class_vec.push(path.to_str().unwrap().to_string()); - } - } - } else { - return Err(ErrorCode::InternalError(format!( - "failed to read CONNECTOR_LIBS_PATH \"{}\"", - libs_path - )) - .into()); - } - - let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") { - heap_size - } else { - // Use 10% of total memory by default - format!("{}", total_memory_available_bytes() / 10) - }; - - // Build the VM properties - let args_builder = InitArgsBuilder::new() - // Pass the JNI API version (default is 8) - .version(JNIVersion::V8) - .option("-ea") - .option("-Dis_embedded_connector=true") - .option(format!("-Djava.class.path={}", class_vec.join(":"))) - .option(format!("-Xmx{}", jvm_heap_size)); - - tracing::info!("JVM args: {:?}", args_builder); - let jvm_args = args_builder.build().unwrap(); - - // Create a new VM - let jvm = match JavaVM::new(jvm_args) { - Err(err) => { - tracing::error!("fail to new JVM {:?}", err); - return Err(ErrorCode::InternalError("fail to new JVM".to_string()).into()); - } - Ok(jvm) => jvm, - }; - - tracing::info!("initialize JVM successfully"); - - register_native_method_for_jvm(&jvm); - - Ok(jvm) -}); - -fn register_native_method_for_jvm(jvm: &JavaVM) { - let mut env = jvm - .attach_current_thread() - .inspect_err(|e| tracing::error!("jvm attach thread error: {:?}", e)) - .unwrap(); - - let binding_class = env - .find_class("com/risingwave/java/binding/Binding") - .inspect_err(|e| tracing::error!("jvm find class error: {:?}", e)) - .unwrap(); - env.register_native_methods( - binding_class, - &[ - NativeMethod { - name: JNIString::from("vnodeCount"), - sig: JNIString::from("()I"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_vnodeCount as *mut c_void, - }, - #[cfg(not(madsim))] - NativeMethod { - name: JNIString::from("hummockIteratorNew"), - sig: JNIString::from("([B)J"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorNew - as *mut c_void, - }, - #[cfg(not(madsim))] - NativeMethod { - name: JNIString::from("hummockIteratorNext"), - sig: JNIString::from("(J)J"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorNext - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("hummockIteratorClose"), - sig: JNIString::from("(J)V"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorClose - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetKey"), - sig: JNIString::from("(J)[B"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetKey as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetOp"), - sig: JNIString::from("(J)I"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetOp as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowIsNull"), - sig: JNIString::from("(JI)Z"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowIsNull as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetInt16Value"), - sig: JNIString::from("(JI)S"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt16Value - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetInt32Value"), - sig: JNIString::from("(JI)I"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt32Value - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetInt64Value"), - sig: JNIString::from("(JI)J"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt64Value - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetFloatValue"), - sig: JNIString::from("(JI)F"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetFloatValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetDoubleValue"), - sig: JNIString::from("(JI)D"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDoubleValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetBooleanValue"), - sig: JNIString::from("(JI)Z"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetBooleanValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetStringValue"), - sig: JNIString::from("(JI)Ljava/lang/String;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetStringValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetTimestampValue"), - sig: JNIString::from("(JI)Ljava/sql/Timestamp;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetTimestampValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetDecimalValue"), - sig: JNIString::from("(JI)Ljava/math/BigDecimal;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDecimalValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetTimeValue"), - sig: JNIString::from("(JI)Ljava/sql/Time;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetTimeValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetDateValue"), - sig: JNIString::from("(JI)Ljava/sql/Date;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDateValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetIntervalValue"), - sig: JNIString::from("(JI)Ljava/lang/String;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetIntervalValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetJsonbValue"), - sig: JNIString::from("(JI)Ljava/lang/String;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetJsonbValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetByteaValue"), - sig: JNIString::from("(JI)[B"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetByteaValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowGetArrayValue"), - sig: JNIString::from("(JILjava/lang/Class;)Ljava/lang/Object;"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetArrayValue - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("rowClose"), - sig: JNIString::from("(J)V"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowClose as *mut c_void, - }, - NativeMethod { - name: JNIString::from("streamChunkIteratorNew"), - sig: JNIString::from("([B)J"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorNew - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("streamChunkIteratorNext"), - sig: JNIString::from("(J)J"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorNext - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("streamChunkIteratorClose"), - sig: JNIString::from("(J)V"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorClose - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("streamChunkIteratorFromPretty"), - sig: JNIString::from("(Ljava/lang/String;)J"), - fn_ptr: - crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorFromPretty - as *mut c_void, - }, - NativeMethod { - name: JNIString::from("sendCdcSourceMsgToChannel"), - sig: JNIString::from("(J[B)Z"), - fn_ptr: crate::Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel - as *mut c_void, - }, - ], - ) - .inspect_err(|e| tracing::error!("jvm register native methods error: {:?}", e)) - .unwrap(); - - tracing::info!("register native methods for jvm successfully"); -} diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 6fa6f2f10e99..6dcdc56f3e5f 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -19,7 +19,6 @@ #![feature(result_option_inspect)] pub mod hummock_iterator; -pub mod jvm_runtime; pub mod stream_chunk_iterator; use std::backtrace::Backtrace; @@ -34,9 +33,7 @@ use jni::objects::{ JValue, JValueGen, JValueOwned, ReleaseMode, }; use jni::signature::ReturnType; -use jni::sys::{ - jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue, JNI_FALSE, JNI_TRUE, -}; +use jni::sys::{jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue}; use jni::JNIEnv; use prost::{DecodeError, Message}; use risingwave_common::array::{ArrayError, StreamChunk}; @@ -296,7 +293,6 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( VirtualNode::COUNT as jint } -#[cfg(not(madsim))] #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNew<'a>( env: EnvParam<'a>, @@ -309,7 +305,6 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorN }) } -#[cfg(not(madsim))] #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNext<'a>( env: EnvParam<'a>, @@ -827,43 +822,6 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( pointer.drop() } -/// Send messages to the channel received by `CdcSplitReader`. -/// If msg is null, just check whether the channel is closed. -/// Return true if sending is successful, otherwise, return false so that caller can stop -/// gracefully. -#[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>( - env: EnvParam<'a>, - channel: Pointer<'a, GetEventStreamJniSender>, - msg: JByteArray<'a>, -) -> jboolean { - execute_and_catch(env, move |env| { - // If msg is null means just check whether channel is closed. - if msg.is_null() { - if channel.as_ref().is_closed() { - return Ok(JNI_FALSE); - } else { - return Ok(JNI_TRUE); - } - } - - let get_event_stream_response: GetEventStreamResponse = - Message::decode(to_guarded_slice(&msg, env)?.deref())?; - - tracing::debug!("before send"); - match channel.as_ref().blocking_send(get_event_stream_response) { - Ok(_) => { - tracing::debug!("send successfully"); - Ok(JNI_TRUE) - } - Err(e) => { - tracing::debug!("send error. {:?}", e); - Ok(JNI_FALSE) - } - } - }) -} - #[cfg(test)] mod tests { use risingwave_common::types::{DataType, Timestamptz}; diff --git a/src/risedevtool/config/src/main.rs b/src/risedevtool/config/src/main.rs index 876d920109d8..f6088e81d9b3 100644 --- a/src/risedevtool/config/src/main.rs +++ b/src/risedevtool/config/src/main.rs @@ -396,12 +396,6 @@ fn main() -> Result<()> { )?; if chosen.contains(&component) { writeln!(file, "{}=true", component.env())?; - if component == Components::BuildConnectorNode { - writeln!( - file, - "CONNECTOR_LIBS_PATH=.risingwave/bin/connector-node/libs/" - )?; - } } else { writeln!(file, "# {}=true", component.env())?; } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 603619f3a8b2..c514f5fb221a 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -52,7 +52,6 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } -jni = { version = "0.21", features = ["invocation"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] } @@ -151,7 +150,6 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } -jni = { version = "0.21", features = ["invocation"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] }