From fb0622dee812b3b081299f533c8348e73e534171 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 12 Sep 2023 18:30:55 +0800 Subject: [PATCH] feat(connector): init embedded connector node (#12122) Co-authored-by: Eric Fu Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- Cargo.lock | 15 + ci/scripts/e2e-source-test.sh | 1 + docker/Dockerfile | 2 + docker/Dockerfile.hdfs | 2 + java/com_risingwave_java_binding_Binding.h | 8 + .../source/core/DbzCdcEngineRunner.java | 27 ++ .../source/core/JniDbzSourceHandler.java | 107 +++++++ .../com/risingwave/java/binding/Binding.java | 9 +- src/connector/Cargo.toml | 2 + src/connector/src/source/cdc/source/reader.rs | 88 +++--- src/java_binding/src/lib.rs | 5 - src/jni_core/src/jvm_runtime.rs | 284 ++++++++++++++++++ src/jni_core/src/lib.rs | 44 ++- src/risedevtool/config/src/main.rs | 6 + src/workspace-hack/Cargo.toml | 2 + 15 files changed, 561 insertions(+), 41 deletions(-) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java create mode 100644 src/jni_core/src/jvm_runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 5d8c2132144d0..ab1d0077c8386 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3846,6 +3846,16 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "java-locator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" +dependencies = [ + "glob", + "lazy_static", +] + [[package]] name = "jni" version = "0.21.1" @@ -3855,7 +3865,9 @@ dependencies = [ "cesu8", "cfg-if", "combine", + "java-locator", "jni-sys", + "libloading", "log", "thiserror", "walkdir", @@ -6890,6 +6902,7 @@ dependencies = [ "hyper-tls", "icelake", "itertools 0.11.0", + "jni", "jsonschema-transpiler", "madsim-rdkafka", "madsim-tokio", @@ -6913,6 +6926,7 @@ dependencies = [ "rand", "reqwest", "risingwave_common", + "risingwave_jni_core", "risingwave_pb", "risingwave_rpc_client", "rust_decimal", @@ -10102,6 +10116,7 @@ dependencies = [ "hyper", "indexmap 1.9.3", "itertools 0.10.5", + "jni", "lexical-core", "lexical-parse-float", "lexical-parse-integer", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 3dc25892a4615..246e92c391776 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -7,6 +7,7 @@ source ci/scripts/common.sh # prepare environment export CONNECTOR_RPC_ENDPOINT="localhost:50051" +export CONNECTOR_LIBS_PATH="./connector-node/libs" while getopts 'p:' opt; do case ${opt} in diff --git a/docker/Dockerfile b/docker/Dockerfile index c665735a07718..1e467d4feac2e 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -64,6 +64,8 @@ COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof ENV PLAYGROUND_PROFILE docker-playground # Set default dashboard UI to local path instead of github proxy ENV RW_DASHBOARD_UI_PATH /risingwave/ui +# Set default connector libs path +ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs ENTRYPOINT [ "/risingwave/bin/risingwave" ] CMD [ "playground" ] diff --git a/docker/Dockerfile.hdfs b/docker/Dockerfile.hdfs index b312438ba80ee..7d22f7b516d48 100644 --- a/docker/Dockerfile.hdfs +++ b/docker/Dockerfile.hdfs @@ -88,6 +88,8 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH} ENV PLAYGROUND_PROFILE docker-playground # Set default dashboard UI to local path instead of github proxy ENV RW_DASHBOARD_UI_PATH /risingwave/ui +# Set default connector libs path +ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs ENTRYPOINT [ "/risingwave/hdfs_env.sh" ] CMD [ "playground" ] diff --git a/java/com_risingwave_java_binding_Binding.h b/java/com_risingwave_java_binding_Binding.h index c2c027ed22b58..a3e9aa95ec84e 100644 --- a/java/com_risingwave_java_binding_Binding.h +++ b/java/com_risingwave_java_binding_Binding.h @@ -223,6 +223,14 @@ JNIEXPORT void JNICALL Java_com_risingwave_java_binding_Binding_streamChunkItera JNIEXPORT jlong JNICALL Java_com_risingwave_java_binding_Binding_streamChunkIteratorFromPretty (JNIEnv *, jclass, jstring); +/* + * Class: com_risingwave_java_binding_Binding + * Method: sendCdcSourceMsgToChannel + * Signature: (J[B)Z + */ +JNIEXPORT jboolean JNICALL Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel + (JNIEnv *, jclass, jlong, jbyteArray); + #ifdef __cplusplus } #endif diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index e9fef6e869c04..ba9511b02303b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -70,6 +70,33 @@ public static CdcEngineRunner newCdcEngineRunner( return runner; } + public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) { + DbzCdcEngineRunner runner = null; + try { + var sourceId = config.getSourceId(); + var engine = + new DbzCdcEngine( + config.getSourceId(), + config.getResolvedDebeziumProps(), + (success, message, error) -> { + if (!success) { + LOG.error( + "engine#{} terminated with error. message: {}", + sourceId, + message, + error); + } else { + LOG.info("engine#{} stopped normally. {}", sourceId, message); + } + }); + + runner = new DbzCdcEngineRunner(engine); + } catch (Exception e) { + LOG.error("failed to create the CDC engine", e); + } + return runner; + } + /** Start to run the cdc engine */ public void start() { if (isRunning()) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java new file mode 100644 index 0000000000000..b590fdf3da8b5 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -0,0 +1,107 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.core; + +import com.risingwave.connector.api.source.SourceTypeE; +import com.risingwave.connector.source.common.DbzConnectorConfig; +import com.risingwave.java.binding.Binding; +import com.risingwave.metrics.ConnectorNodeMetrics; +import com.risingwave.proto.ConnectorServiceProto; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** handler for starting a debezium source connectors for jni */ +public class JniDbzSourceHandler { + static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); + + private final DbzConnectorConfig config; + + public JniDbzSourceHandler(DbzConnectorConfig config) { + this.config = config; + } + + public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) + throws com.google.protobuf.InvalidProtocolBufferException { + var request = + ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes); + + // For jni.rs + java.lang.Thread.currentThread() + .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader()); + // userProps extracted from request, underlying implementation is UnmodifiableMap + Map mutableUserProps = new HashMap<>(request.getPropertiesMap()); + mutableUserProps.put("source.id", Long.toString(request.getSourceId())); + var config = + new DbzConnectorConfig( + SourceTypeE.valueOf(request.getSourceType()), + request.getSourceId(), + request.getStartOffset(), + mutableUserProps, + request.getSnapshotDone()); + JniDbzSourceHandler handler = new JniDbzSourceHandler(config); + handler.start(channelPtr); + } + + public void start(long channelPtr) { + var runner = DbzCdcEngineRunner.newCdcEngineRunner(config); + if (runner == null) { + return; + } + + try { + // Start the engine + runner.start(); + LOG.info("Start consuming events of table {}", config.getSourceId()); + + while (runner.isRunning()) { + // check whether the send queue has room for new messages + // Thread will block on the channel to get output from engine + var resp = runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS); + boolean success; + if (resp != null) { + ConnectorNodeMetrics.incSourceRowsReceived( + config.getSourceType().toString(), + String.valueOf(config.getSourceId()), + resp.getEventsCount()); + LOG.info( + "Engine#{}: emit one chunk {} events to network ", + config.getSourceId(), + resp.getEventsCount()); + success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray()); + } else { + // If resp is null means just check whether channel is closed. + success = Binding.sendCdcSourceMsgToChannel(channelPtr, null); + } + if (!success) { + LOG.info( + "Engine#{}: JNI sender broken detected, stop the engine", + config.getSourceId()); + runner.stop(); + return; + } + } + } catch (Throwable t) { + LOG.error("Cdc engine failed.", t); + try { + runner.stop(); + } catch (Exception e) { + LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e); + } + } + } +} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index 3f05768ec74b8..4a79033b147a8 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -17,8 +17,13 @@ import io.questdb.jar.jni.JarJniLoader; public class Binding { + private static final boolean IS_EMBEDDED_CONNECTOR = + Boolean.parseBoolean(System.getProperty("is_embedded_connector")); + static { - JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding"); + if (!IS_EMBEDDED_CONNECTOR) { + JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding"); + } } public static native int vnodeCount(); @@ -84,4 +89,6 @@ public class Binding { static native void streamChunkIteratorClose(long pointer); static native long streamChunkIteratorFromPretty(String str); + + public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg); } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 33e32c5685113..c834452352f5f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -53,6 +53,7 @@ hyper = "0.14" hyper-tls = "0.5" icelake = { workspace = true } itertools = "0.11" +jni = { version = "0.21.1", features = ["invocation"] } jsonschema-transpiler = "1.10.0" maplit = "1.0.2" moka = { version = "0.11", features = ["future"] } @@ -83,6 +84,7 @@ rdkafka = { workspace = true, features = [ ] } reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } +risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rust_decimal = "1" diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 974ec8877d2f6..6c76d838af128 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -13,13 +13,19 @@ // limitations under the License. use std::str::FromStr; +use std::sync::LazyLock; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::pin_mut; use futures_async_stream::try_stream; +use itertools::Itertools; +use jni::objects::JValue; +use prost::Message; use risingwave_common::util::addr::HostAddr; -use risingwave_pb::connector_service::GetEventStreamResponse; +use risingwave_jni_core::jvm_runtime::JVM; +use risingwave_jni_core::GetEventStreamJniSender; +use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse}; +use tokio::sync::mpsc; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; @@ -44,6 +50,8 @@ pub struct CdcSplitReader { source_ctx: SourceContextRef, } +const DEFAULT_CHANNEL_SIZE: usize = 16; + #[async_trait] impl SplitReader for CdcSplitReader where @@ -99,10 +107,6 @@ where { #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { - let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| { - anyhow!("connector node endpoint not specified or unable to connect to connector node") - })?; - // rewrite the hostname and port for the split let mut properties = self.conn_props.props.clone(); @@ -121,38 +125,54 @@ where properties.insert("table.name".into(), table_name); } - let cdc_stream = cdc_client - .start_source_stream( - self.source_id, - self.conn_props.get_source_type_pb(), - self.start_offset, - properties, - self.snapshot_done, - ) - .await - .inspect_err(|err| tracing::error!("connector node start stream error: {}", err))?; - pin_mut!(cdc_stream); - #[for_await] - for event_res in cdc_stream { - match event_res { - Ok(GetEventStreamResponse { events, .. }) => { - if events.is_empty() { - continue; - } - let mut msgs = Vec::with_capacity(events.len()); - for event in events { - msgs.push(SourceMessage::from(event)); - } - yield msgs; + let (tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + LazyLock::force(&JVM).as_ref()?; + + let get_event_stream_request = GetEventStreamRequest { + source_id: self.source_id, + source_type: self.conn_props.get_source_type_pb() as _, + start_offset: self.start_offset.unwrap_or_default(), + properties, + snapshot_done: self.snapshot_done, + }; + + std::thread::spawn(move || { + let mut env = JVM + .as_ref() + .unwrap() + .attach_current_thread_as_daemon() + .unwrap(); + + let get_event_stream_request_bytes = env + .byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request)) + .unwrap(); + let result = env.call_static_method( + "com/risingwave/connector/source/core/JniDbzSourceHandler", + "runJniDbzSourceThread", + "([BJ)V", + &[ + JValue::Object(&get_event_stream_request_bytes), + JValue::from(&tx as *const GetEventStreamJniSender as i64), + ], + ); + + match result { + Ok(_) => { + tracing::info!("end of jni call runJniDbzSourceThread"); } Err(e) => { - return Err(anyhow!( - "Cdc service error: code {}, msg {}", - e.code(), - e.message() - )) + tracing::error!("jni call error: {:?}", e); } } + }); + + while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { + tracing::debug!("receive events {:?}", events.len()); + let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); + yield msgs; } + + Err(anyhow!("all senders are dropped"))?; } } diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 12a3c59fc829f..aa7e564ed1ace 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -12,9 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(error_generic_member_access)] -#![feature(lazy_cell)] -#![feature(once_cell_try)] -#![feature(type_alias_impl_trait)] - pub use risingwave_jni_core::*; diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs new file mode 100644 index 0000000000000..5559abd3ffa3f --- /dev/null +++ b/src/jni_core/src/jvm_runtime.rs @@ -0,0 +1,284 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::option::Option::Some; +use std::ffi::c_void; +use std::fs; +use std::path::Path; +use std::sync::LazyLock; + +use jni::strings::JNIString; +use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod}; +use risingwave_common::error::{ErrorCode, RwError}; +use risingwave_common::util::resource_util::memory::total_memory_available_bytes; + +pub static JVM: LazyLock> = LazyLock::new(|| { + let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") { + libs_path + } else { + return Err(ErrorCode::InternalError( + "environment variable CONNECTOR_LIBS_PATH is not specified".to_string(), + ) + .into()); + }; + + let dir = Path::new(&libs_path); + + if !dir.is_dir() { + return Err(ErrorCode::InternalError(format!( + "CONNECTOR_LIBS_PATH \"{}\" is not a directory", + libs_path + )) + .into()); + } + + let mut class_vec = vec![]; + + if let Ok(entries) = fs::read_dir(dir) { + for entry in entries.flatten() { + let entry_path = entry.path(); + if entry_path.file_name().is_some() { + let path = std::fs::canonicalize(entry_path)?; + class_vec.push(path.to_str().unwrap().to_string()); + } + } + } else { + return Err(ErrorCode::InternalError(format!( + "failed to read CONNECTOR_LIBS_PATH \"{}\"", + libs_path + )) + .into()); + } + + let jvm_heap_size = if let Ok(heap_size) = std::env::var("JVM_HEAP_SIZE") { + heap_size + } else { + // Use 10% of total memory by default + format!("{}", total_memory_available_bytes() / 10) + }; + + // Build the VM properties + let args_builder = InitArgsBuilder::new() + // Pass the JNI API version (default is 8) + .version(JNIVersion::V8) + .option("-ea") + .option("-Dis_embedded_connector=true") + .option(format!("-Djava.class.path={}", class_vec.join(":"))) + .option(format!("-Xmx{}", jvm_heap_size)); + + tracing::info!("JVM args: {:?}", args_builder); + let jvm_args = args_builder.build().unwrap(); + + // Create a new VM + let jvm = match JavaVM::new(jvm_args) { + Err(err) => { + tracing::error!("fail to new JVM {:?}", err); + return Err(ErrorCode::InternalError("fail to new JVM".to_string()).into()); + } + Ok(jvm) => jvm, + }; + + tracing::info!("initialize JVM successfully"); + + register_native_method_for_jvm(&jvm); + + Ok(jvm) +}); + +fn register_native_method_for_jvm(jvm: &JavaVM) { + let mut env = jvm + .attach_current_thread() + .inspect_err(|e| tracing::error!("jvm attach thread error: {:?}", e)) + .unwrap(); + + let binding_class = env + .find_class("com/risingwave/java/binding/Binding") + .inspect_err(|e| tracing::error!("jvm find class error: {:?}", e)) + .unwrap(); + env.register_native_methods( + binding_class, + &[ + NativeMethod { + name: JNIString::from("vnodeCount"), + sig: JNIString::from("()I"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_vnodeCount as *mut c_void, + }, + #[cfg(not(madsim))] + NativeMethod { + name: JNIString::from("hummockIteratorNew"), + sig: JNIString::from("([B)J"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorNew + as *mut c_void, + }, + #[cfg(not(madsim))] + NativeMethod { + name: JNIString::from("hummockIteratorNext"), + sig: JNIString::from("(J)J"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorNext + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("hummockIteratorClose"), + sig: JNIString::from("(J)V"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_hummockIteratorClose + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetKey"), + sig: JNIString::from("(J)[B"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetKey as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetOp"), + sig: JNIString::from("(J)I"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetOp as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowIsNull"), + sig: JNIString::from("(JI)Z"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowIsNull as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetInt16Value"), + sig: JNIString::from("(JI)S"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt16Value + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetInt32Value"), + sig: JNIString::from("(JI)I"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt32Value + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetInt64Value"), + sig: JNIString::from("(JI)J"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetInt64Value + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetFloatValue"), + sig: JNIString::from("(JI)F"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetFloatValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetDoubleValue"), + sig: JNIString::from("(JI)D"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDoubleValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetBooleanValue"), + sig: JNIString::from("(JI)Z"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetBooleanValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetStringValue"), + sig: JNIString::from("(JI)Ljava/lang/String;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetStringValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetTimestampValue"), + sig: JNIString::from("(JI)Ljava/sql/Timestamp;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetTimestampValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetDecimalValue"), + sig: JNIString::from("(JI)Ljava/math/BigDecimal;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDecimalValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetTimeValue"), + sig: JNIString::from("(JI)Ljava/sql/Time;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetTimeValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetDateValue"), + sig: JNIString::from("(JI)Ljava/sql/Date;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetDateValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetIntervalValue"), + sig: JNIString::from("(JI)Ljava/lang/String;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetIntervalValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetJsonbValue"), + sig: JNIString::from("(JI)Ljava/lang/String;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetJsonbValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetByteaValue"), + sig: JNIString::from("(JI)[B"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetByteaValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowGetArrayValue"), + sig: JNIString::from("(JILjava/lang/Class;)Ljava/lang/Object;"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowGetArrayValue + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("rowClose"), + sig: JNIString::from("(J)V"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_rowClose as *mut c_void, + }, + NativeMethod { + name: JNIString::from("streamChunkIteratorNew"), + sig: JNIString::from("([B)J"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorNew + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("streamChunkIteratorNext"), + sig: JNIString::from("(J)J"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorNext + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("streamChunkIteratorClose"), + sig: JNIString::from("(J)V"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorClose + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("streamChunkIteratorFromPretty"), + sig: JNIString::from("(Ljava/lang/String;)J"), + fn_ptr: + crate::Java_com_risingwave_java_binding_Binding_streamChunkIteratorFromPretty + as *mut c_void, + }, + NativeMethod { + name: JNIString::from("sendCdcSourceMsgToChannel"), + sig: JNIString::from("(J[B)Z"), + fn_ptr: crate::Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel + as *mut c_void, + }, + ], + ) + .inspect_err(|e| tracing::error!("jvm register native methods error: {:?}", e)) + .unwrap(); + + tracing::info!("register native methods for jvm successfully"); +} diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index be350ae57a460..a8f0e5a683e35 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -19,6 +19,7 @@ #![feature(result_option_inspect)] pub mod hummock_iterator; +pub mod jvm_runtime; pub mod stream_chunk_iterator; use std::backtrace::Backtrace; @@ -33,7 +34,9 @@ use jni::objects::{ JValue, JValueGen, JValueOwned, ReleaseMode, }; use jni::signature::ReturnType; -use jni::sys::{jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue}; +use jni::sys::{ + jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue, JNI_FALSE, JNI_TRUE, +}; use jni::JNIEnv; use prost::{DecodeError, Message}; use risingwave_common::array::{ArrayError, StreamChunk}; @@ -293,6 +296,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( VirtualNode::COUNT as jint } +#[cfg(not(madsim))] #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNew<'a>( env: EnvParam<'a>, @@ -305,6 +309,7 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorN }) } +#[cfg(not(madsim))] #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNext<'a>( env: EnvParam<'a>, @@ -822,6 +827,43 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( pointer.drop() } +/// Send messages to the channel received by `CdcSplitReader`. +/// If msg is null, just check whether the channel is closed. +/// Return true if sending is successful, otherwise, return false so that caller can stop +/// gracefully. +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>( + env: EnvParam<'a>, + channel: Pointer<'a, GetEventStreamJniSender>, + msg: JByteArray<'a>, +) -> jboolean { + execute_and_catch(env, move |env| { + // If msg is null means just check whether channel is closed. + if msg.is_null() { + if channel.as_ref().is_closed() { + return Ok(JNI_FALSE); + } else { + return Ok(JNI_TRUE); + } + } + + let get_event_stream_response: GetEventStreamResponse = + Message::decode(to_guarded_slice(&msg, env)?.deref())?; + + tracing::debug!("before send"); + match channel.as_ref().blocking_send(get_event_stream_response) { + Ok(_) => { + tracing::debug!("send successfully"); + Ok(JNI_TRUE) + } + Err(e) => { + tracing::debug!("send error. {:?}", e); + Ok(JNI_FALSE) + } + } + }) +} + #[cfg(test)] mod tests { use risingwave_common::types::{DataType, Timestamptz}; diff --git a/src/risedevtool/config/src/main.rs b/src/risedevtool/config/src/main.rs index 2b1a4968d6195..931d6128647cb 100644 --- a/src/risedevtool/config/src/main.rs +++ b/src/risedevtool/config/src/main.rs @@ -382,6 +382,12 @@ fn main() -> Result<()> { )?; if chosen.contains(&component) { writeln!(file, "{}=true", component.env())?; + if component == Components::BuildConnectorNode { + writeln!( + file, + "CONNECTOR_LIBS_PATH=.risingwave/bin/connector-node/libs/" + )?; + } } else { writeln!(file, "# {}=true", component.env())?; } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 3d9d4278a024f..8db097d709bbd 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -51,6 +51,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } +jni = { version = "0.21", features = ["invocation"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] } @@ -148,6 +149,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } +jni = { version = "0.21", features = ["invocation"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] }