Skip to content

Commit

Permalink
add cdcJniChannelClose
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 8, 2023
1 parent 547941b commit 67f0e72
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 6 deletions.
8 changes: 8 additions & 0 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.java.binding.Binding;
import com.risingwave.java.binding.CdcJniChannel;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.HashMap;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void start(long channelPtr) {
return;
}

try {
try (CdcJniChannel channel = new CdcJniChannel(channelPtr)) {
// Start the engine
runner.start();
LOG.info("Start consuming events of table {}", config.getSourceId());
Expand All @@ -83,10 +84,12 @@ public void start(long channelPtr) {
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
resp.getEventsCount());
success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray());
success =
Binding.sendCdcSourceMsgToChannel(
channel.getPointer(), resp.toByteArray());
} else {
// If resp is null means just check whether channel is closed.
success = Binding.sendCdcSourceMsgToChannel(channelPtr, null);
success = Binding.sendCdcSourceMsgToChannel(channel.getPointer(), null);
}
if (!success) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ public class Binding {
static native long streamChunkIteratorFromPretty(String str);

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);

static native void cdcJniChannelClose(long pointer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.risingwave.java.binding;

public class CdcJniChannel implements AutoCloseable {
private final long pointer;
private boolean isClosed;

public CdcJniChannel(long pointer) {
this.pointer = pointer;
this.isClosed = false;
}

public long getPointer() {
return this.pointer;
}

@Override
public void close() {
if (!isClosed) {
isClosed = true;
Binding.cdcJniChannelClose(pointer);
}
}
}
6 changes: 6 additions & 0 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ fn register_native_method_for_jvm(jvm: &JavaVM) {
fn_ptr: crate::Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel
as *mut c_void,
},
NativeMethod {
name: JNIString::from("cdcJniChannelClose"),
sig: JNIString::from("(J)V"),
fn_ptr: crate::Java_com_risingwave_java_binding_Binding_cdcJniChannelClose
as *mut c_void,
},
],
)
.inspect_err(|e| tracing::error!("jvm register native methods error: {:?}", e))
Expand Down
11 changes: 8 additions & 3 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,6 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsg
// If msg is null means just check whether channel is closed.
if msg.is_null() {
if channel.as_ref().is_closed() {
// Drop channel as well.
channel.drop();
return Ok(JNI_FALSE);
} else {
return Ok(JNI_TRUE);
Expand All @@ -860,14 +858,21 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsg
Ok(JNI_TRUE)
}
Err(e) => {
channel.drop();
tracing::debug!("send error. {:?}", e);
Ok(JNI_FALSE)
}
}
})
}

#[no_mangle]
pub extern "system" fn Java_com_risingwave_java_binding_Binding_cdcJniChannelClose<'a>(
_env: EnvParam<'a>,
pointer: Pointer<'a, GetEventStreamJniSender>,
) {
pointer.drop()
}

#[cfg(test)]
mod tests {
use risingwave_common::types::{DataType, Timestamptz};
Expand Down

0 comments on commit 67f0e72

Please sign in to comment.