Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): init embedded connector node #12122

Merged
merged 56 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2904555
test jni rs
chenzl25 Aug 22, 2023
2831bad
CHANNEL POOL
chenzl25 Aug 23, 2023
8e3fc67
support jni create cdc source
chenzl25 Aug 24, 2023
bea170f
fmt
chenzl25 Aug 25, 2023
768776d
fmt
chenzl25 Aug 25, 2023
03664e9
fmt
chenzl25 Aug 25, 2023
37e530e
remove cdylib
chenzl25 Aug 25, 2023
89d421f
fmt
chenzl25 Aug 25, 2023
61f5463
use never inline for run_this_func_to_get_valid_ptr_from_java_binding
chenzl25 Aug 25, 2023
1f2d3ae
refine sendMsgToChannel
chenzl25 Aug 28, 2023
2e3c93f
add copyright
fuyufjh Aug 28, 2023
41583db
support resouce reclamation
chenzl25 Aug 28, 2023
3041df2
drop channel pointer properly
chenzl25 Aug 28, 2023
2dbbb51
resolve conflicts
chenzl25 Aug 31, 2023
ef0f0c8
serialize proto msg in jni
chenzl25 Aug 31, 2023
a940f8d
refactor
chenzl25 Sep 1, 2023
d0ef67f
first version of embedded connector node
chenzl25 Sep 4, 2023
df9fc72
refine
chenzl25 Sep 4, 2023
071b96d
better naming
chenzl25 Sep 5, 2023
738a2dd
resolve conflict
chenzl25 Sep 6, 2023
d9562ff
remove jvm related codes
chenzl25 Sep 6, 2023
4f4bb7f
remove bin in jni_core
chenzl25 Sep 6, 2023
018b332
refine cargo toml
chenzl25 Sep 6, 2023
25e1a8a
add necessary dependencies to java_binding
chenzl25 Sep 6, 2023
8b5b94a
fmt
chenzl25 Sep 6, 2023
41bd111
Revert "fmt"
chenzl25 Sep 6, 2023
4b2089f
Revert "add necessary dependencies to java_binding"
chenzl25 Sep 6, 2023
35ab88a
Revert "refine cargo toml"
chenzl25 Sep 6, 2023
8bcce49
Revert "remove jvm related codes"
chenzl25 Sep 6, 2023
e16d040
fmt
chenzl25 Sep 6, 2023
cfa9e0e
resolve conflicts
chenzl25 Sep 6, 2023
5017dff
set up env to init JVM
chenzl25 Sep 6, 2023
5a1bd7f
simplify runJniDbzSourceThread
chenzl25 Sep 6, 2023
fb87e9c
refine
chenzl25 Sep 6, 2023
0d7f770
prepare CONNECTOR_LIBS_PATH for e2e source tests
chenzl25 Sep 7, 2023
336aae6
workaround madsim
chenzl25 Sep 7, 2023
3b4233f
refactor
chenzl25 Sep 7, 2023
079520b
use Result instead of Option
chenzl25 Sep 7, 2023
f55a545
fix
chenzl25 Sep 7, 2023
a0901d2
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 7, 2023
170df20
resolve conflicts
chenzl25 Sep 8, 2023
a8d54ce
set CONNECTOR_LIBS_PATH for risedev
chenzl25 Sep 8, 2023
d5ee0fb
remove func
chenzl25 Sep 8, 2023
ae1327d
refactor
chenzl25 Sep 8, 2023
3a39f98
fmt
chenzl25 Sep 8, 2023
adc1a8d
use LazyLock::force
chenzl25 Sep 8, 2023
547941b
refactor JniDbzSourceHandler
chenzl25 Sep 8, 2023
67f0e72
add cdcJniChannelClose
chenzl25 Sep 8, 2023
cfa8ac6
Update java/java-binding/src/main/java/com/risingwave/java/binding/Cd…
chenzl25 Sep 8, 2023
0d87139
resolve conflicts
chenzl25 Sep 11, 2023
366eba9
refine
chenzl25 Sep 11, 2023
dd0142b
drop channel pointer in rust side
chenzl25 Sep 11, 2023
c184ade
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 11, 2023
7208816
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 11, 2023
e6fbfde
refine
chenzl25 Sep 12, 2023
ba14875
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs
Comment on lines +67 to +68
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add default CONNECTOR_LIBS_PATH to the Dockerfile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I miss any other Dockerfiles? cc @fuyufjh @BugenZhao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this is the only one


ENTRYPOINT [ "/risingwave/bin/risingwave" ]
CMD [ "playground" ]
2 changes: 2 additions & 0 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH}
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs
Comment on lines +91 to +92
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add default CONNECTOR_LIBS_PATH to the Dockerfile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, opandal also uses jni to start a jvm when supporting hdfs access. May need to test whether it will conflict with the embedded connector node. cc @wcy-fdu


ENTRYPOINT [ "/risingwave/hdfs_env.sh" ]
CMD [ "playground" ]
8 changes: 0 additions & 8 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.java.binding.Binding;
import com.risingwave.java.binding.CdcJniChannel;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.HashMap;
Expand All @@ -38,27 +37,24 @@ public JniDbzSourceHandler(DbzConnectorConfig config) {

public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)
throws com.google.protobuf.InvalidProtocolBufferException {
try (CdcJniChannel channel = new CdcJniChannel(channelPtr)) {
var request =
ConnectorServiceProto.GetEventStreamRequest.parseFrom(
getEventStreamRequestBytes);
var request =
ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes);

// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
// userProps extracted from request, underlying implementation is UnmodifiableMap
Map<String, String> mutableUserProps = new HashMap<>(request.getPropertiesMap());
mutableUserProps.put("source.id", Long.toString(request.getSourceId()));
var config =
new DbzConnectorConfig(
SourceTypeE.valueOf(request.getSourceType()),
request.getSourceId(),
request.getStartOffset(),
mutableUserProps,
request.getSnapshotDone());
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
handler.start(channel.getPointer());
}
// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
// userProps extracted from request, underlying implementation is UnmodifiableMap
Map<String, String> mutableUserProps = new HashMap<>(request.getPropertiesMap());
mutableUserProps.put("source.id", Long.toString(request.getSourceId()));
var config =
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
new DbzConnectorConfig(
SourceTypeE.valueOf(request.getSourceType()),
request.getSourceId(),
request.getStartOffset(),
mutableUserProps,
request.getSnapshotDone());
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
handler.start(channelPtr);
}

public void start(long channelPtr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,4 @@ public class Binding {
static native long streamChunkIteratorFromPretty(String str);

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);

static native void cdcJniChannelClose(long pointer);
}

This file was deleted.

33 changes: 21 additions & 12 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,33 @@ where

let channel_ptr = Box::into_raw(tx) as i64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current code, we don't even need wrapping the channel with box, since the tx variable is dropped after the method is called. We can pass channel_ptr as &tx as *const Sender as i64.


let _ = env
.call_static_method(
"com/risingwave/connector/source/core/JniDbzSourceHandler",
"runJniDbzSourceThread",
"([BJ)V",
&[
JValue::Object(&get_event_stream_request_bytes),
JValue::from(channel_ptr),
],
)
.inspect_err(|e| tracing::error!("jni call error: {:?}", e))
.unwrap();
let result = env.call_static_method(
"com/risingwave/connector/source/core/JniDbzSourceHandler",
"runJniDbzSourceThread",
"([BJ)V",
&[
JValue::Object(&get_event_stream_request_bytes),
JValue::from(channel_ptr),
],
);

unsafe { drop(Box::from_raw(channel_ptr as *mut GetEventStreamJniSender)) };
match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
}
}
});

while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await {
tracing::debug!("receive events {:?}", events.len());
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}

Err(anyhow!("all senders are dropped"))?;
}
}
6 changes: 0 additions & 6 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,6 @@ fn register_native_method_for_jvm(jvm: &JavaVM) {
fn_ptr: crate::Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel
as *mut c_void,
},
NativeMethod {
name: JNIString::from("cdcJniChannelClose"),
sig: JNIString::from("(J)V"),
fn_ptr: crate::Java_com_risingwave_java_binding_Binding_cdcJniChannelClose
as *mut c_void,
},
],
)
.inspect_err(|e| tracing::error!("jvm register native methods error: {:?}", e))
Expand Down
8 changes: 0 additions & 8 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,14 +864,6 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsg
})
}

#[no_mangle]
pub extern "system" fn Java_com_risingwave_java_binding_Binding_cdcJniChannelClose<'a>(
_env: EnvParam<'a>,
pointer: Pointer<'a, GetEventStreamJniSender>,
) {
pointer.drop()
}

#[cfg(test)]
mod tests {
use risingwave_common::types::{DataType, Timestamptz};
Expand Down
Loading