Skip to content

Commit

Permalink
feat(connector): init embedded connector node (#12122)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and Li0k committed Sep 15, 2023
1 parent 14e32b7 commit fb0622d
Show file tree
Hide file tree
Showing 15 changed files with 561 additions and 41 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ source ci/scripts/common.sh

# prepare environment
export CONNECTOR_RPC_ENDPOINT="localhost:50051"
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
Expand Down
2 changes: 2 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs

ENTRYPOINT [ "/risingwave/bin/risingwave" ]
CMD [ "playground" ]
2 changes: 2 additions & 0 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH}
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs

ENTRYPOINT [ "/risingwave/hdfs_env.sh" ]
CMD [ "playground" ]
8 changes: 8 additions & 0 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ public static CdcEngineRunner newCdcEngineRunner(
return runner;
}

public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
var engine =
new DbzCdcEngine(
config.getSourceId(),
config.getResolvedDebeziumProps(),
(success, message, error) -> {
if (!success) {
LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
}
});

runner = new DbzCdcEngineRunner(engine);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

/** Start to run the cdc engine */
public void start() {
if (isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.core;

import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.java.binding.Binding;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** handler for starting a debezium source connectors for jni */
public class JniDbzSourceHandler {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class);

private final DbzConnectorConfig config;

public JniDbzSourceHandler(DbzConnectorConfig config) {
this.config = config;
}

public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)
throws com.google.protobuf.InvalidProtocolBufferException {
var request =
ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes);

// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());
// userProps extracted from request, underlying implementation is UnmodifiableMap
Map<String, String> mutableUserProps = new HashMap<>(request.getPropertiesMap());
mutableUserProps.put("source.id", Long.toString(request.getSourceId()));
var config =
new DbzConnectorConfig(
SourceTypeE.valueOf(request.getSourceType()),
request.getSourceId(),
request.getStartOffset(),
mutableUserProps,
request.getSnapshotDone());
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
handler.start(channelPtr);
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.newCdcEngineRunner(config);
if (runner == null) {
return;
}

try {
// Start the engine
runner.start();
LOG.info("Start consuming events of table {}", config.getSourceId());

while (runner.isRunning()) {
// check whether the send queue has room for new messages
// Thread will block on the channel to get output from engine
var resp = runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS);
boolean success;
if (resp != null) {
ConnectorNodeMetrics.incSourceRowsReceived(
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount());
LOG.info(
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
resp.getEventsCount());
success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray());
} else {
// If resp is null means just check whether channel is closed.
success = Binding.sendCdcSourceMsgToChannel(channelPtr, null);
}
if (!success) {
LOG.info(
"Engine#{}: JNI sender broken detected, stop the engine",
config.getSourceId());
runner.stop();
return;
}
}
} catch (Throwable t) {
LOG.error("Cdc engine failed.", t);
try {
runner.stop();
} catch (Exception e) {
LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
import io.questdb.jar.jni.JarJniLoader;

public class Binding {
private static final boolean IS_EMBEDDED_CONNECTOR =
Boolean.parseBoolean(System.getProperty("is_embedded_connector"));

static {
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
if (!IS_EMBEDDED_CONNECTOR) {
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
}
}

public static native int vnodeCount();
Expand Down Expand Up @@ -84,4 +89,6 @@ public class Binding {
static native void streamChunkIteratorClose(long pointer);

static native long streamChunkIteratorFromPretty(String str);

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);
}
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ hyper = "0.14"
hyper-tls = "0.5"
icelake = { workspace = true }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = "1.10.0"
maplit = "1.0.2"
moka = { version = "0.11", features = ["future"] }
Expand Down Expand Up @@ -83,6 +84,7 @@ rdkafka = { workspace = true, features = [
] }
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
Expand Down
88 changes: 54 additions & 34 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::pin_mut;
use futures_async_stream::try_stream;
use itertools::Itertools;
use jni::objects::JValue;
use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::connector_service::GetEventStreamResponse;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::GetEventStreamJniSender;
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use tokio::sync::mpsc;

use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
Expand All @@ -44,6 +50,8 @@ pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
source_ctx: SourceContextRef,
}

const DEFAULT_CHANNEL_SIZE: usize = 16;

#[async_trait]
impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T>
where
Expand Down Expand Up @@ -99,10 +107,6 @@ where
{
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| {
anyhow!("connector node endpoint not specified or unable to connect to connector node")
})?;

// rewrite the hostname and port for the split
let mut properties = self.conn_props.props.clone();

Expand All @@ -121,38 +125,54 @@ where
properties.insert("table.name".into(), table_name);
}

let cdc_stream = cdc_client
.start_source_stream(
self.source_id,
self.conn_props.get_source_type_pb(),
self.start_offset,
properties,
self.snapshot_done,
)
.await
.inspect_err(|err| tracing::error!("connector node start stream error: {}", err))?;
pin_mut!(cdc_stream);
#[for_await]
for event_res in cdc_stream {
match event_res {
Ok(GetEventStreamResponse { events, .. }) => {
if events.is_empty() {
continue;
}
let mut msgs = Vec::with_capacity(events.len());
for event in events {
msgs.push(SourceMessage::from(event));
}
yield msgs;
let (tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

LazyLock::force(&JVM).as_ref()?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
source_type: self.conn_props.get_source_type_pb() as _,
start_offset: self.start_offset.unwrap_or_default(),
properties,
snapshot_done: self.snapshot_done,
};

std::thread::spawn(move || {
let mut env = JVM
.as_ref()
.unwrap()
.attach_current_thread_as_daemon()
.unwrap();

let get_event_stream_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))
.unwrap();
let result = env.call_static_method(
"com/risingwave/connector/source/core/JniDbzSourceHandler",
"runJniDbzSourceThread",
"([BJ)V",
&[
JValue::Object(&get_event_stream_request_bytes),
JValue::from(&tx as *const GetEventStreamJniSender as i64),
],
);

match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
}
Err(e) => {
return Err(anyhow!(
"Cdc service error: code {}, msg {}",
e.code(),
e.message()
))
tracing::error!("jni call error: {:?}", e);
}
}
});

while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await {
tracing::debug!("receive events {:?}", events.len());
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}

Err(anyhow!("all senders are dropped"))?;
}
}
Loading

0 comments on commit fb0622d

Please sign in to comment.