diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java index 294ebffcc70e8..92028255d28e3 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java @@ -124,10 +124,12 @@ public void testLines() throws InterruptedException, SQLException { int count = 0; while (eventStream.hasNext()) { List messages = eventStream.next().getEventsList(); - for (CdcMessage ignored : messages) { - count++; + for (CdcMessage msg : messages) { + if (!msg.getPayload().isBlank()) { + count++; + } } - if (count == 10000) { + if (count >= 10000) { return count; } } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index b673f533948ee..4fff4e7f50ad0 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -130,10 +130,12 @@ public void testLines() throws Exception { while (eventStream.hasNext()) { List messages = eventStream.next().getEventsList(); - for (ConnectorServiceProto.CdcMessage ignored : messages) { - count++; + for (ConnectorServiceProto.CdcMessage msg : messages) { + if (!msg.getPayload().isBlank()) { + count++; + } } - if (count == 10000) { + if (count >= 10000) { return count; } } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java index 738d0f850a39b..359746bc90f77 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java @@ -41,7 +41,7 @@ public class SourceTestClient { static final Logger LOG = LoggerFactory.getLogger(SourceTestClient.class.getName()); // default port for connector service - static final int DEFAULT_PORT = 50051; + static final int DEFAULT_PORT = 60051; private final ConnectorServiceGrpc.ConnectorServiceBlockingStub blockingStub; public Properties sqlStmts = new Properties();