From d7080bd3b9e7e931fe6c22627e39fc3206d7649f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Wed, 8 Nov 2023 23:37:06 +0800 Subject: [PATCH] Rename CaseInsensitiveQualifiedTable --- .../config/CreateTableConfiguration.java | 6 +++--- ...ava => CaseInsensitiveQualifiedTable.java} | 20 +++++++------------ .../table/TableInventoryCheckParameter.java | 6 +++--- ...ingleTableInventoryCalculateParameter.java | 4 ++-- ...C32SingleTableInventoryCalculatorTest.java | 5 +++-- .../migration/api/impl/MigrationJobAPI.java | 6 +++--- .../MigrationDataConsistencyChecker.java | 14 ++++++------- .../e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 12 +++++------ ...ordSingleTableInventoryCalculatorTest.java | 4 ++-- 9 files changed, 36 insertions(+), 41 deletions(-) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/{SchemaTableName.java => CaseInsensitiveQualifiedTable.java} (75%) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/CreateTableConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/CreateTableConfiguration.java index ede27092550ac..676ef228a48cb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/CreateTableConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/CreateTableConfiguration.java @@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; /** * Create table configuration. @@ -33,9 +33,9 @@ public final class CreateTableConfiguration { private final PipelineDataSourceConfiguration sourceDataSourceConfig; - private final SchemaTableName sourceName; + private final CaseInsensitiveQualifiedTable sourceName; private final PipelineDataSourceConfiguration targetDataSourceConfig; - private final SchemaTableName targetName; + private final CaseInsensitiveQualifiedTable targetName; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveQualifiedTable.java similarity index 75% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveQualifiedTable.java index 316cac8d5f4bf..d22be4eaefbd1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveQualifiedTable.java @@ -20,33 +20,27 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.ToString; /** - * Schema name and table name. + * Case insensitive qualified table. */ @RequiredArgsConstructor @Getter @EqualsAndHashCode -@ToString -public final class SchemaTableName { +// TODO should merge with QualifiedTable +public final class CaseInsensitiveQualifiedTable { private final CaseInsensitiveIdentifier schemaName; private final CaseInsensitiveIdentifier tableName; - public SchemaTableName(final String schemaName, final String tableName) { + public CaseInsensitiveQualifiedTable(final String schemaName, final String tableName) { this.schemaName = new CaseInsensitiveIdentifier(schemaName); this.tableName = new CaseInsensitiveIdentifier(tableName); } - /** - * Marshal to text. - * - * @return text - */ - public String marshal() { - String schemaName = this.schemaName.toString(); - return null == schemaName ? tableName.toString() : schemaName + "." + tableName; + @Override + public String toString() { + return null == schemaName ? tableName.toString() : String.join(".", schemaName.toString(), tableName.toString()); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java index 444591e409b6d..a0991d039b399 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java @@ -19,7 +19,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; @@ -40,9 +40,9 @@ public final class TableInventoryCheckParameter { private final PipelineDataSourceWrapper targetDataSource; - private final SchemaTableName sourceTable; + private final CaseInsensitiveQualifiedTable sourceTable; - private final SchemaTableName targetTable; + private final CaseInsensitiveQualifiedTable targetTable; private final List columnNames; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java index 1b17a1fe01889..f4e29fe27b5a0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java @@ -19,7 +19,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -40,7 +40,7 @@ public final class SingleTableInventoryCalculateParameter { */ private final PipelineDataSourceWrapper dataSource; - private final SchemaTableName table; + private final CaseInsensitiveQualifiedTable table; private final List columnNames; diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java index fbb6eb93b9aa3..78d83394230a9 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; @@ -65,7 +65,8 @@ class CRC32SingleTableInventoryCalculatorTest { void setUp() throws SQLException { DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "FIXTURE"); List uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true)); - parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource, new SchemaTableName(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap()); + parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource, + new CaseInsensitiveQualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap()); when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType); when(pipelineDataSource.getConnection()).thenReturn(connection); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index bd5853ac5bd20..98f23834bfaeb 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -41,7 +41,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; @@ -284,8 +284,8 @@ private Collection buildCreateTableConfigurations(fina DataNode dataNode = each.getDataNodes().get(0); PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); CreateTableConfiguration createTableConfig = new CreateTableConfiguration( - sourceDataSourceConfig, new SchemaTableName(sourceSchemaName, dataNode.getTableName()), - jobConfig.getTarget(), new SchemaTableName(targetSchemaName, each.getLogicTableName())); + sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), + jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); result.add(createTableConfig); } log.info("buildCreateTableConfigurations, result={}", result); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index 18ba1c25c9a87..addf2dee11af1 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; @@ -86,7 +86,7 @@ public Map check(final String algorithm progressContext.setRecordsCount(getRecordsCount()); progressContext.getTableNames().addAll(sourceTableNames); progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); - Map result = new LinkedHashMap<>(); + Map result = new LinkedHashMap<>(); try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); TableDataConsistencyChecker tableChecker = TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) { @@ -94,7 +94,7 @@ public Map check(final String algorithm checkTableInventoryData(each, tableChecker, result, dataSourceManager); } } - return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().marshal(), Entry::getValue)); + return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue)); } private long getRecordsCount() { @@ -103,11 +103,11 @@ private long getRecordsCount() { } private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker, - final Map checkResultMap, final PipelineDataSourceManager dataSourceManager) { + final Map checkResultMap, final PipelineDataSourceManager dataSourceManager) { for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) { for (DataNode each : entry.getDataNodes()) { TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, dataSourceManager); - checkResultMap.put(new SchemaTableName(each.getSchemaName(), each.getTableName()), checkResult); + checkResultMap.put(new CaseInsensitiveQualifiedTable(each.getSchemaName(), each.getTableName()), checkResult); if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) { log.info("Unmatched on table '{}', ignore left tables", DataNodeUtils.formatWithSchema(each)); return; @@ -118,8 +118,8 @@ private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, fina private TableDataConsistencyCheckResult checkSingleTableInventoryData(final String targetTableName, final DataNode dataNode, final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager dataSourceManager) { - SchemaTableName sourceTable = new SchemaTableName(dataNode.getSchemaName(), dataNode.getTableName()); - SchemaTableName targetTable = new SchemaTableName(dataNode.getSchemaName(), targetTableName); + CaseInsensitiveQualifiedTable sourceTable = new CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), dataNode.getTableName()); + CaseInsensitiveQualifiedTable targetTable = new CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName); PipelineDataSourceWrapper sourceDataSource = dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName())); PipelineDataSourceWrapper targetDataSource = dataSourceManager.getDataSource(jobConfig.getTarget()); PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource); diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index e26d9e8cbb6e2..70da4a2663aa8 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; @@ -132,14 +132,14 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ } Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS) .until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(dialectDatabaseMetaData)).size() == actualProxyList.size()); - SchemaTableName orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable() - ? new SchemaTableName(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) - : new SchemaTableName(null, SOURCE_TABLE_NAME); + CaseInsensitiveQualifiedTable orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable() + ? new CaseInsensitiveQualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) + : new CaseInsensitiveQualifiedTable(null, SOURCE_TABLE_NAME); PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(dataSource, containerComposer.getDatabaseType()); PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4), containerComposer.getDatabaseType()); assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName); - assertDataMatched(sourceDataSource, targetDataSource, new SchemaTableName(null, "t_address")); + assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address")); containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0); assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty()); } @@ -191,7 +191,7 @@ private String getOrderTableNameWithSchema(final DialectDatabaseMetaData dialect return dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME; } - private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) { + private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final CaseInsensitiveQualifiedTable schemaTableName) { StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource); PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), schemaTableName.getTableName().toString()); List uniqueKeys = Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0))); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java index f658a310c3ee4..31d9f124006a2 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java @@ -19,7 +19,7 @@ import com.zaxxer.hikari.HikariDataSource; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; @@ -108,6 +108,6 @@ void assertCalculateOfAllQueryFromMiddle() { private SingleTableInventoryCalculateParameter generateParameter(final PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) { List uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "integer", false, true, true)); - return new SingleTableInventoryCalculateParameter(dataSource, new SchemaTableName(null, "t_order"), Collections.emptyList(), uniqueKeys, dataCheckPosition); + return new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(), uniqueKeys, dataCheckPosition); } }