-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): init embedded connector node #12122
Changes from 53 commits
2904555
2831bad
8e3fc67
bea170f
768776d
03664e9
37e530e
89d421f
61f5463
1f2d3ae
2e3c93f
41583db
3041df2
2dbbb51
ef0f0c8
a940f8d
d0ef67f
df9fc72
071b96d
738a2dd
d9562ff
4f4bb7f
018b332
25e1a8a
8b5b94a
41bd111
4b2089f
35ab88a
8bcce49
e16d040
cfa9e0e
5017dff
5a1bd7f
fb87e9c
0d7f770
336aae6
3b4233f
079520b
f55a545
a0901d2
170df20
a8d54ce
d5ee0fb
ae1327d
3a39f98
adc1a8d
547941b
67f0e72
cfa8ac6
0d87139
366eba9
dd0142b
c184ade
7208816
e6fbfde
ba14875
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,8 @@ ENV CLASSPATH ${HADOOP_CONF_DIR}:${CLASSPATH} | |
ENV PLAYGROUND_PROFILE docker-playground | ||
# Set default dashboard UI to local path instead of github proxy | ||
ENV RW_DASHBOARD_UI_PATH /risingwave/ui | ||
# Set default connector libs path | ||
ENV CONNECTOR_LIBS_PATH /risingwave/bin/connector-node/libs | ||
Comment on lines
+91
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add default CONNECTOR_LIBS_PATH to the Dockerfile. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, opandal also uses jni to start a jvm when supporting hdfs access. May need to test whether it will conflict with the embedded connector node. cc @wcy-fdu |
||
|
||
ENTRYPOINT [ "/risingwave/hdfs_env.sh" ] | ||
CMD [ "playground" ] |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
// Copyright 2023 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.risingwave.connector.source.core; | ||
|
||
import com.risingwave.connector.api.source.SourceTypeE; | ||
import com.risingwave.connector.source.common.DbzConnectorConfig; | ||
import com.risingwave.java.binding.Binding; | ||
import com.risingwave.metrics.ConnectorNodeMetrics; | ||
import com.risingwave.proto.ConnectorServiceProto; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** handler for starting a debezium source connectors for jni */ | ||
public class JniDbzSourceHandler { | ||
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); | ||
|
||
private final DbzConnectorConfig config; | ||
|
||
public JniDbzSourceHandler(DbzConnectorConfig config) { | ||
this.config = config; | ||
} | ||
|
||
public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) | ||
throws com.google.protobuf.InvalidProtocolBufferException { | ||
var request = | ||
ConnectorServiceProto.GetEventStreamRequest.parseFrom(getEventStreamRequestBytes); | ||
|
||
// For jni.rs | ||
java.lang.Thread.currentThread() | ||
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader()); | ||
// userProps extracted from request, underlying implementation is UnmodifiableMap | ||
Map<String, String> mutableUserProps = new HashMap<>(request.getPropertiesMap()); | ||
mutableUserProps.put("source.id", Long.toString(request.getSourceId())); | ||
var config = | ||
chenzl25 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
new DbzConnectorConfig( | ||
SourceTypeE.valueOf(request.getSourceType()), | ||
request.getSourceId(), | ||
request.getStartOffset(), | ||
mutableUserProps, | ||
request.getSnapshotDone()); | ||
JniDbzSourceHandler handler = new JniDbzSourceHandler(config); | ||
handler.start(channelPtr); | ||
} | ||
|
||
public void start(long channelPtr) { | ||
var runner = DbzCdcEngineRunner.newCdcEngineRunner(config); | ||
if (runner == null) { | ||
return; | ||
} | ||
|
||
try { | ||
// Start the engine | ||
runner.start(); | ||
LOG.info("Start consuming events of table {}", config.getSourceId()); | ||
|
||
while (runner.isRunning()) { | ||
// check whether the send queue has room for new messages | ||
// Thread will block on the channel to get output from engine | ||
var resp = runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS); | ||
boolean success; | ||
if (resp != null) { | ||
ConnectorNodeMetrics.incSourceRowsReceived( | ||
config.getSourceType().toString(), | ||
String.valueOf(config.getSourceId()), | ||
resp.getEventsCount()); | ||
LOG.info( | ||
"Engine#{}: emit one chunk {} events to network ", | ||
config.getSourceId(), | ||
resp.getEventsCount()); | ||
success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray()); | ||
} else { | ||
// If resp is null means just check whether channel is closed. | ||
success = Binding.sendCdcSourceMsgToChannel(channelPtr, null); | ||
} | ||
if (!success) { | ||
LOG.info( | ||
"Engine#{}: JNI sender broken detected, stop the engine", | ||
config.getSourceId()); | ||
runner.stop(); | ||
return; | ||
} | ||
} | ||
} catch (Throwable t) { | ||
LOG.error("Cdc engine failed.", t); | ||
try { | ||
runner.stop(); | ||
} catch (Exception e) { | ||
LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add default CONNECTOR_LIBS_PATH to the Dockerfile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I miss any other Dockerfiles? cc @fuyufjh @BugenZhao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK this is the only one