Skip to content

Commit

Permalink
Improve CDCE2EIT
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 9, 2023
1 parent 711a2e1 commit 34d59af
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
initSchemaAndTable(containerComposer, connection, 0);
}
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
startCDCClient(containerComposer, dialectDatabaseMetaData);
final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
Expand All @@ -132,6 +132,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
}
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS)
.until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(dialectDatabaseMetaData)).size() == actualProxyList.size());
cdcClient.close();
CaseInsensitiveQualifiedTable orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable()
? new CaseInsensitiveQualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
: new CaseInsensitiveQualifiedTable(null, SOURCE_TABLE_NAME);
Expand Down Expand Up @@ -166,16 +167,17 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont
containerComposer.getUsername(), containerComposer.getPassword()));
}

private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : "";
CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage()));
cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage()));
result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
SchemaTable.newBuilder().setTable("t_address").build())), true));
return result;
}

private List<Map<String, Object>> listOrderRecords(final PipelineContainerComposer containerComposer, final String tableNameWithSchema) throws SQLException {
Expand Down

0 comments on commit 34d59af

Please sign in to comment.