Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): init embedded connector node #12122

Merged
merged 56 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2904555
test jni rs
chenzl25 Aug 22, 2023
2831bad
CHANNEL POOL
chenzl25 Aug 23, 2023
8e3fc67
support jni create cdc source
chenzl25 Aug 24, 2023
bea170f
fmt
chenzl25 Aug 25, 2023
768776d
fmt
chenzl25 Aug 25, 2023
03664e9
fmt
chenzl25 Aug 25, 2023
37e530e
remove cdylib
chenzl25 Aug 25, 2023
89d421f
fmt
chenzl25 Aug 25, 2023
61f5463
use never inline for run_this_func_to_get_valid_ptr_from_java_binding
chenzl25 Aug 25, 2023
1f2d3ae
refine sendMsgToChannel
chenzl25 Aug 28, 2023
2e3c93f
add copyright
fuyufjh Aug 28, 2023
41583db
support resouce reclamation
chenzl25 Aug 28, 2023
3041df2
drop channel pointer properly
chenzl25 Aug 28, 2023
2dbbb51
resolve conflicts
chenzl25 Aug 31, 2023
ef0f0c8
serialize proto msg in jni
chenzl25 Aug 31, 2023
a940f8d
refactor
chenzl25 Sep 1, 2023
d0ef67f
first version of embedded connector node
chenzl25 Sep 4, 2023
df9fc72
refine
chenzl25 Sep 4, 2023
071b96d
better naming
chenzl25 Sep 5, 2023
738a2dd
resolve conflict
chenzl25 Sep 6, 2023
d9562ff
remove jvm related codes
chenzl25 Sep 6, 2023
4f4bb7f
remove bin in jni_core
chenzl25 Sep 6, 2023
018b332
refine cargo toml
chenzl25 Sep 6, 2023
25e1a8a
add necessary dependencies to java_binding
chenzl25 Sep 6, 2023
8b5b94a
fmt
chenzl25 Sep 6, 2023
41bd111
Revert "fmt"
chenzl25 Sep 6, 2023
4b2089f
Revert "add necessary dependencies to java_binding"
chenzl25 Sep 6, 2023
35ab88a
Revert "refine cargo toml"
chenzl25 Sep 6, 2023
8bcce49
Revert "remove jvm related codes"
chenzl25 Sep 6, 2023
e16d040
fmt
chenzl25 Sep 6, 2023
cfa9e0e
resolve conflicts
chenzl25 Sep 6, 2023
5017dff
set up env to init JVM
chenzl25 Sep 6, 2023
5a1bd7f
simplify runJniDbzSourceThread
chenzl25 Sep 6, 2023
fb87e9c
refine
chenzl25 Sep 6, 2023
0d7f770
prepare CONNECTOR_LIBS_PATH for e2e source tests
chenzl25 Sep 7, 2023
336aae6
workaround madsim
chenzl25 Sep 7, 2023
3b4233f
refactor
chenzl25 Sep 7, 2023
079520b
use Result instead of Option
chenzl25 Sep 7, 2023
f55a545
fix
chenzl25 Sep 7, 2023
a0901d2
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 7, 2023
170df20
resolve conflicts
chenzl25 Sep 8, 2023
a8d54ce
set CONNECTOR_LIBS_PATH for risedev
chenzl25 Sep 8, 2023
d5ee0fb
remove func
chenzl25 Sep 8, 2023
ae1327d
refactor
chenzl25 Sep 8, 2023
3a39f98
fmt
chenzl25 Sep 8, 2023
adc1a8d
use LazyLock::force
chenzl25 Sep 8, 2023
547941b
refactor JniDbzSourceHandler
chenzl25 Sep 8, 2023
67f0e72
add cdcJniChannelClose
chenzl25 Sep 8, 2023
cfa8ac6
Update java/java-binding/src/main/java/com/risingwave/java/binding/Cd…
chenzl25 Sep 8, 2023
0d87139
resolve conflicts
chenzl25 Sep 11, 2023
366eba9
refine
chenzl25 Sep 11, 2023
dd0142b
drop channel pointer in rust side
chenzl25 Sep 11, 2023
c184ade
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 11, 2023
7208816
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 11, 2023
e6fbfde
refine
chenzl25 Sep 12, 2023
ba14875
Merge branch 'main' into dylan/init_embedded_connector_node
chenzl25 Sep 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ source ci/scripts/common.sh

# prepare environment
export CONNECTOR_RPC_ENDPOINT="localhost:50051"
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
Expand Down
8 changes: 8 additions & 0 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ public static CdcEngineRunner newCdcEngineRunner(
return runner;
}

public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
var engine =
new DbzCdcEngine(
config.getSourceId(),
config.getResolvedDebeziumProps(),
(success, message, error) -> {
if (!success) {
LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
}
});

runner = new DbzCdcEngineRunner(engine);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

/** Start to run the cdc engine */
public void start() {
if (isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.core;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.java.binding.Binding;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** handler for starting a debezium source connectors for jni */
public class JniDbzSourceHandler {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class);

private final DbzConnectorConfig config;

public JniDbzSourceHandler(DbzConnectorConfig config) {
this.config = config;
}

public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)
throws com.google.protobuf.InvalidProtocolBufferException {

var request =
ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes);

// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
// userProps extracted from grpc request, underlying implementation is UnmodifiableMap
Map<String, String> mutableUserProps = new HashMap<>(request.getPropertiesMap());
mutableUserProps.put("source.id", Long.toString(request.getSourceId()));
var config =
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
new DbzConnectorConfig(
SourceTypeE.valueOf(request.getSourceType()),
request.getSourceId(),
request.getStartOffset(),
mutableUserProps,
request.getSnapshotDone());
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
handler.start(channelPtr);
}

class OnReadyHandler implements Runnable {
private final CdcEngineRunner runner;
private final long channelPtr;

public OnReadyHandler(CdcEngineRunner runner, long channelPtr) {
this.runner = runner;
this.channelPtr = channelPtr;
}

@Override
public void run() {
while (runner.isRunning()) {
try {
// check whether the send queue has room for new messages
// Thread will block on the channel to get output from engine
var resp =
runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS);
boolean success;
if (resp != null) {
ConnectorNodeMetrics.incSourceRowsReceived(
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount());
LOG.info(
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
resp.getEventsCount());
success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray());
} else {
// If resp is null means just check whether channel is closed.
success = Binding.sendCdcSourceMsgToChannel(channelPtr, null);
}
if (!success) {
LOG.info(
"Engine#{}: JNI sender broken detected, stop the engine",
config.getSourceId());
runner.stop();
return;
}
} catch (Throwable e) {
LOG.error("Poll engine output channel fail. ", e);
}
}
}
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.newCdcEngineRunner(config);
if (runner == null) {
return;
}

try {
// Start the engine
runner.start();
LOG.info("Start consuming events of table {}", config.getSourceId());

final OnReadyHandler onReadyHandler = new OnReadyHandler(runner, channelPtr);
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved

onReadyHandler.run();

} catch (Throwable t) {
LOG.error("Cdc engine failed.", t);
try {
runner.stop();
} catch (Exception e) {
LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
import io.questdb.jar.jni.JarJniLoader;

public class Binding {
private static final boolean IS_EMBEDDED_CONNECTOR =
Boolean.parseBoolean(System.getProperty("is_embedded_connector"));

static {
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
if (!IS_EMBEDDED_CONNECTOR) {
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
}
}

public static native int vnodeCount();
Expand Down Expand Up @@ -84,4 +89,6 @@ public class Binding {
static native void streamChunkIteratorClose(long pointer);

static native long streamChunkIteratorFromPretty(String str);

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);
}
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ hyper = "0.14"
hyper-tls = "0.5"
icelake = { workspace = true }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = "1.10.0"
maplit = "1.0.2"
moka = { version = "0.11", features = ["future"] }
Expand Down Expand Up @@ -82,6 +83,7 @@ rdkafka = { workspace = true, features = [
] }
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_pb::connector_service::SourceType as PbSourceType;

use crate::source::cdc::{
Expand Down Expand Up @@ -60,6 +61,7 @@ impl SplitEnumerator for DebeziumSplitEnumerator {
.unwrap_or_default();

let source_type = props.get_source_type_pb()?;
JVM.as_ref()?;
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
// validate connector properties
connector_client
.validate_source_properties(
Expand Down
Loading