From 607ba1138f625315fe0ec760b8ad772f5659fb0b Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 24 Oct 2023 17:55:40 +0800 Subject: [PATCH] Pipeline E2E compatible with com.mysql:mysql-connector-j:8.0 (#28853) * Pipeline E2E use statement.execute and getResultSet to compatible with com.mysql:mysql-connector-j:8.0 for extended SQL * Revert pipeline E2E compatibility code of com.mysql:mysql-connector-j:8.0 * Disable CDCE2EIT for now --- .../cases/PipelineContainerComposer.java | 9 +++++--- .../general/MySQLMigrationGeneralE2EIT.java | 11 ++++++--- .../primarykey/IndexesMigrationE2EIT.java | 23 ++++++++++++++----- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java index 7095b587f94e7..fa86987e48918 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java @@ -61,6 +61,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -420,9 +421,11 @@ public List> queryForListWithLog(final String sql) { */ public List> queryForListWithLog(final DataSource dataSource, final String sql) { log.info("Query SQL: {}", sql); - try (Connection connection = dataSource.getConnection()) { - ResultSet resultSet = connection.createStatement().executeQuery(sql); - return transformResultSetToList(resultSet); + try ( + Connection connection = dataSource.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute(sql); + return transformResultSetToList(statement.getResultSet()); } catch (final SQLException ex) { throw new RuntimeException(ex); } diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java index 07317d6d5a30e..e48d3d4c57287 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java @@ -38,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import javax.sql.DataSource; import java.sql.SQLException; import java.time.LocalDateTime; import java.util.List; @@ -82,14 +83,17 @@ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLExc containerComposer.startIncrementTask( new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30)); TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30); + containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME)); + containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item (item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')"); stopMigrationByJobId(containerComposer, orderJobId); startMigrationByJobId(containerComposer, orderJobId); - containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderJobId)); - String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item"); - containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderItemJobId)); + DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy(); + containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order", 10000); + containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order_item", 10000); Properties algorithmProps = new Properties(); algorithmProps.setProperty("chunk-size", "300"); assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH", algorithmProps); + String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item"); assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH", algorithmProps); Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true); assertMigrationSuccessById(containerComposer, orderItemJobId, "CRC32_MATCH", new Properties()); @@ -97,6 +101,7 @@ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLExc commitMigrationByJobId(containerComposer, each); } assertTrue(listJobId(containerComposer).isEmpty()); + containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, PipelineContainerComposer.TABLE_INIT_ROW_COUNT, ""); } } diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java index 8c2b40dd2e123..d4b01eee70576 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey; import lombok.SneakyThrows; +import org.apache.commons.codec.binary.Hex; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; @@ -39,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -90,12 +92,13 @@ void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) th } KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm(); // TODO PostgreSQL update delete events not support if table without unique keys at increment task. - final Consumer incrementalTaskFn = unused -> { + final Consumer incrementalTaskFn = dataSource -> { if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) { doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey()); } Object orderId = keyGenerateAlgorithm.generateKey(); insertOneOrder(containerComposer, orderId); + containerComposer.assertOrderRecordExist(dataSource, "t_order", orderId); }; assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn); } @@ -169,9 +172,10 @@ void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter testParam } KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm(); Object uniqueKey = keyGenerateAlgorithm.generateKey(); - assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> { + assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> { insertOneOrder(containerComposer, uniqueKey); doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey()); + containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey); }); } } @@ -191,9 +195,10 @@ void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) } KeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm(); Object uniqueKey = keyGenerateAlgorithm.generateKey(); - assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> { + assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> { insertOneOrder(containerComposer, uniqueKey); doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey()); + containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey); }); } } @@ -215,12 +220,16 @@ void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final PipelineTestPa KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm(); // TODO Insert binary string in VARBINARY column. But KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is not Comparable byte[] uniqueKey = new byte[]{-1, 0, 1}; - assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> insertOneOrder(containerComposer, uniqueKey)); + assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> { + insertOneOrder(containerComposer, uniqueKey); + // TODO Select by byte[] from proxy doesn't work, so unhex function is used for now + containerComposer.assertOrderRecordExist(dataSource, String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey))); + }); } } private void assertMigrationSuccess(final PipelineContainerComposer containerComposer, final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm, - final String consistencyCheckAlgorithmType, final Consumer incrementalTaskFn) throws Exception { + final String consistencyCheckAlgorithmType, final Consumer incrementalTaskFn) throws Exception { containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME)); try (Connection connection = containerComposer.getSourceDataSource().getConnection()) { PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT); @@ -232,12 +241,14 @@ private void assertMigrationSuccess(final PipelineContainerComposer containerCom startMigration(containerComposer, SOURCE_TABLE_NAME, TARGET_TABLE_NAME); String jobId = listJobId(containerComposer).get(0); containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId)); - incrementalTaskFn.accept(null); + DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy(); + incrementalTaskFn.accept(jdbcDataSource); containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId)); if (null != consistencyCheckAlgorithmType) { assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType); } commitMigrationByJobId(containerComposer, jobId); + assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1)); List lastJobIds = listJobId(containerComposer); assertTrue(lastJobIds.isEmpty()); }