From 547941b05a84352a7bf3e88f5b4767380fabc5f8 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 8 Sep 2023 16:52:32 +0800 Subject: [PATCH] refactor JniDbzSourceHandler --- .../source/core/JniDbzSourceHandler.java | 78 +++++++------------ 1 file changed, 27 insertions(+), 51 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index 9e69a37139ad6..f27b4b6485903 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -14,7 +14,6 @@ package com.risingwave.connector.source.core; -import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.java.binding.Binding; @@ -59,52 +58,6 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long handler.start(channelPtr); } - class OnReadyHandler implements Runnable { - private final CdcEngineRunner runner; - private final long channelPtr; - - public OnReadyHandler(CdcEngineRunner runner, long channelPtr) { - this.runner = runner; - this.channelPtr = channelPtr; - } - - @Override - public void run() { - while (runner.isRunning()) { - try { - // check whether the send queue has room for new messages - // Thread will block on the channel to get output from engine - var resp = - runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS); - boolean success; - if (resp != null) { - ConnectorNodeMetrics.incSourceRowsReceived( - config.getSourceType().toString(), - String.valueOf(config.getSourceId()), - resp.getEventsCount()); - LOG.info( - "Engine#{}: emit one chunk {} events to network ", - config.getSourceId(), - resp.getEventsCount()); - success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray()); - } else { - // If resp is null means just check whether channel is closed. - success = Binding.sendCdcSourceMsgToChannel(channelPtr, null); - } - if (!success) { - LOG.info( - "Engine#{}: JNI sender broken detected, stop the engine", - config.getSourceId()); - runner.stop(); - return; - } - } catch (Throwable e) { - LOG.error("Poll engine output channel fail. ", e); - } - } - } - } - public void start(long channelPtr) { var runner = DbzCdcEngineRunner.newCdcEngineRunner(config); if (runner == null) { @@ -116,10 +69,33 @@ public void start(long channelPtr) { runner.start(); LOG.info("Start consuming events of table {}", config.getSourceId()); - final OnReadyHandler onReadyHandler = new OnReadyHandler(runner, channelPtr); - - onReadyHandler.run(); - + while (runner.isRunning()) { + // check whether the send queue has room for new messages + // Thread will block on the channel to get output from engine + var resp = runner.getEngine().getOutputChannel().poll(500, TimeUnit.MILLISECONDS); + boolean success; + if (resp != null) { + ConnectorNodeMetrics.incSourceRowsReceived( + config.getSourceType().toString(), + String.valueOf(config.getSourceId()), + resp.getEventsCount()); + LOG.info( + "Engine#{}: emit one chunk {} events to network ", + config.getSourceId(), + resp.getEventsCount()); + success = Binding.sendCdcSourceMsgToChannel(channelPtr, resp.toByteArray()); + } else { + // If resp is null means just check whether channel is closed. + success = Binding.sendCdcSourceMsgToChannel(channelPtr, null); + } + if (!success) { + LOG.info( + "Engine#{}: JNI sender broken detected, stop the engine", + config.getSourceId()); + runner.stop(); + return; + } + } } catch (Throwable t) { LOG.error("Cdc engine failed.", t); try {