Skip to content

Commit

Permalink
Revert "feat(connector): init embedded connector node (#12122)"
Browse files Browse the repository at this point in the history
This reverts commit 62901e1.
  • Loading branch information
StrikeW committed Sep 24, 2023
1 parent f400b52 commit 4a26072
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 561 deletions.
15 changes: 0 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ source ci/scripts/common.sh

# prepare environment
export CONNECTOR_RPC_ENDPOINT="localhost:50051"
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
Expand Down
2 changes: 0 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs

ENTRYPOINT [ "/risingwave/bin/risingwave" ]
CMD [ "playground" ]
2 changes: 0 additions & 2 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH}
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
# Set default connector libs path
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs

ENTRYPOINT [ "/risingwave/hdfs_env.sh" ]
CMD [ "playground" ]
8 changes: 0 additions & 8 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,6 @@ public static CdcEngineRunner newCdcEngineRunner(
return runner;
}

public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
var engine =
new DbzCdcEngine(
config.getSourceId(),
config.getResolvedDebeziumProps(),
(success, message, error) -> {
if (!success) {
LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
}
});

runner = new DbzCdcEngineRunner(engine);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

/** Start to run the cdc engine */
public void start() {
if (isRunning()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
import io.questdb.jar.jni.JarJniLoader;

public class Binding {
private static final boolean IS_EMBEDDED_CONNECTOR =
Boolean.parseBoolean(System.getProperty("is_embedded_connector"));

static {
if (!IS_EMBEDDED_CONNECTOR) {
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
}
JarJniLoader.loadLib(Binding.class, "/risingwave/jni", "risingwave_java_binding");
}

public static native int vnodeCount();
Expand Down Expand Up @@ -89,6 +84,4 @@ public class Binding {
static native void streamChunkIteratorClose(long pointer);

static native long streamChunkIteratorFromPretty(String str);

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);
}
2 changes: 0 additions & 2 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ hyper = "0.14"
hyper-tls = "0.5"
icelake = { workspace = true }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.11", features = ["future"] }
Expand Down Expand Up @@ -90,7 +89,6 @@ rdkafka = { workspace = true, features = [
] }
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
Expand Down
88 changes: 34 additions & 54 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::pin_mut;
use futures_async_stream::try_stream;
use itertools::Itertools;
use jni::objects::JValue;
use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::GetEventStreamJniSender;
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use tokio::sync::mpsc;
use risingwave_pb::connector_service::GetEventStreamResponse;

use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
Expand All @@ -49,8 +43,6 @@ pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
source_ctx: SourceContextRef,
}

const DEFAULT_CHANNEL_SIZE: usize = 16;

#[async_trait]
impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
type Properties = CdcProperties<T>;
Expand Down Expand Up @@ -101,6 +93,10 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| {
anyhow!("connector node endpoint not specified or unable to connect to connector node")
})?;

// rewrite the hostname and port for the split
let mut properties = self.conn_props.props.clone();

Expand All @@ -119,54 +115,38 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
properties.insert("table.name".into(), table_name);
}

let (tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

LazyLock::force(&JVM).as_ref()?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
source_type: self.conn_props.get_source_type_pb() as _,
start_offset: self.start_offset.unwrap_or_default(),
properties,
snapshot_done: self.snapshot_done,
};

std::thread::spawn(move || {
let mut env = JVM
.as_ref()
.unwrap()
.attach_current_thread_as_daemon()
.unwrap();

let get_event_stream_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))
.unwrap();
let result = env.call_static_method(
"com/risingwave/connector/source/core/JniDbzSourceHandler",
"runJniDbzSourceThread",
"([BJ)V",
&[
JValue::Object(&get_event_stream_request_bytes),
JValue::from(&tx as *const GetEventStreamJniSender as i64),
],
);

match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
let cdc_stream = cdc_client
.start_source_stream(
self.source_id,
self.conn_props.get_source_type_pb(),
self.start_offset,
properties,
self.snapshot_done,
)
.await
.inspect_err(|err| tracing::error!("connector node start stream error: {}", err))?;
pin_mut!(cdc_stream);
#[for_await]
for event_res in cdc_stream {
match event_res {
Ok(GetEventStreamResponse { events, .. }) => {
if events.is_empty() {
continue;
}
let mut msgs = Vec::with_capacity(events.len());
for event in events {
msgs.push(SourceMessage::from(event));
}
yield msgs;
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
return Err(anyhow!(
"Cdc service error: code {}, msg {}",
e.code(),
e.message()
))
}
}
});

while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await {
tracing::debug!("receive events {:?}", events.len());
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}

Err(anyhow!("all senders are dropped"))?;
}
}
Loading

0 comments on commit 4a26072

Please sign in to comment.