diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index 7b63bbca28ade..31720e9e204b0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,10 +20,10 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; @@ -45,7 +45,7 @@ public final class ImporterConfiguration { private final PipelineDataSourceConfiguration dataSourceConfig; // TODO columnName case-insensitive? - private final Map> shardingColumnsMap; + private final Map> shardingColumnsMap; private final TableAndSchemaNameMapper tableAndSchemaNameMapper; @@ -63,7 +63,7 @@ public final class ImporterConfiguration { * @return logic table names */ public Collection getLogicTableNames() { - return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList())); + return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(CaseInsensitiveIdentifier::toString).collect(Collectors.toList())); } /** @@ -73,7 +73,7 @@ public Collection getLogicTableNames() { * @return sharding columns */ public Set getShardingColumns(final String logicTableName) { - return shardingColumnsMap.getOrDefault(new LogicTableName(logicTableName), Collections.emptySet()); + return shardingColumnsMap.getOrDefault(new CaseInsensitiveIdentifier(logicTableName), Collections.emptySet()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java index 997f38aa8e92d..26e3ef3df336a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java @@ -20,8 +20,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.datanode.DataNode; import java.util.LinkedHashMap; @@ -74,11 +73,11 @@ private static Map>> groupDataSourceDataNodes * @param dataNodeLine data node line * @return actual table and logic table map */ - public static Map buildTableNameMap(final JobDataNodeLine dataNodeLine) { - Map result = new LinkedHashMap<>(); + public static Map buildTableNameMap(final JobDataNodeLine dataNodeLine) { + Map result = new LinkedHashMap<>(); for (JobDataNodeEntry each : dataNodeLine.getEntries()) { for (DataNode dataNode : each.getDataNodes()) { - result.put(new ActualTableName(dataNode.getTableName()), new LogicTableName(each.getLogicTableName())); + result.put(new CaseInsensitiveIdentifier(dataNode.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())); } } return result; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java deleted file mode 100644 index 10463ed166a21..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -/** - * Actual table name. - */ -public final class ActualTableName extends TableName { - - public ActualTableName(final String tableName) { - super(tableName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java index 3955c6864b01f..feb48a5848343 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java @@ -24,7 +24,7 @@ */ // TODO table name case-sensitive for some database @EqualsAndHashCode(of = "lowercase") -public class CaseInsensitiveIdentifier { +public final class CaseInsensitiveIdentifier { private final String original; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java deleted file mode 100644 index 6c4045f504814..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -/** - * Logic table name. - */ -public final class LogicTableName extends TableName { - - public LogicTableName(final String tableName) { - super(tableName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java deleted file mode 100644 index 3ae4a740fdc84..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -import javax.annotation.Nullable; - -/** - * Schema name. - *

It might be null.

- *

It's case-insensitive.

- */ -public class SchemaName extends CaseInsensitiveIdentifier { - - public SchemaName(@Nullable final String schemaName) { - super(schemaName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java index 56c3e70abed37..316cac8d5f4bf 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java @@ -19,7 +19,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -30,17 +29,15 @@ @Getter @EqualsAndHashCode @ToString -public class SchemaTableName { +public final class SchemaTableName { - @NonNull - private final SchemaName schemaName; + private final CaseInsensitiveIdentifier schemaName; - @NonNull - private final TableName tableName; + private final CaseInsensitiveIdentifier tableName; public SchemaTableName(final String schemaName, final String tableName) { - this.schemaName = new SchemaName(schemaName); - this.tableName = new TableName(tableName); + this.schemaName = new CaseInsensitiveIdentifier(schemaName); + this.tableName = new CaseInsensitiveIdentifier(tableName); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java deleted file mode 100644 index 26f2c17be14d0..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -/** - * Table name. - * - *

It might be logic table name or actual table name.

- *

It's case-insensitive.

- */ -public class TableName extends CaseInsensitiveIdentifier { - - public TableName(final String tableName) { - super(tableName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java index 31708ad4bb739..e7a12eca308e8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java @@ -19,11 +19,11 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.metadata.TableName; +import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineIndexMetaData; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; -import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; @@ -53,11 +53,11 @@ public final class StandardPipelineTableMetaDataLoader implements PipelineTableM // It doesn't support ShardingSphereDataSource private final PipelineDataSourceWrapper dataSource; - private final Map tableMetaDataMap = new ConcurrentHashMap<>(); + private final Map tableMetaDataMap = new ConcurrentHashMap<>(); @Override public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) { - PipelineTableMetaData result = tableMetaDataMap.get(new TableName(tableName)); + PipelineTableMetaData result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName)); if (null != result) { return result; } @@ -66,7 +66,7 @@ public PipelineTableMetaData getTableMetaData(final String schemaName, final Str } catch (final SQLException ex) { throw new PipelineInternalException(String.format("Load meta data for schema '%s' and table '%s' failed", schemaName, tableName), ex); } - result = tableMetaDataMap.get(new TableName(tableName)); + result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName)); if (null == result) { log.warn("getTableMetaData, can not load meta data for table '{}'", tableName); } @@ -76,12 +76,12 @@ public PipelineTableMetaData getTableMetaData(final String schemaName, final Str private void loadTableMetaData(final String schemaName, final String tableNamePattern) throws SQLException { try (Connection connection = dataSource.getConnection()) { DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData(); - Map tableMetaDataMap = loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableNamePattern); + Map tableMetaDataMap = loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableNamePattern); this.tableMetaDataMap.putAll(tableMetaDataMap); } } - private Map loadTableMetaData0(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException { + private Map loadTableMetaData0(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException { Collection tableNames = new LinkedList<>(); try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) { while (resultSet.next()) { @@ -89,7 +89,7 @@ private Map loadTableMetaData0(final Connectio tableNames.add(tableName); } } - Map result = new LinkedHashMap<>(); + Map result = new LinkedHashMap<>(); for (String each : tableNames) { Set primaryKeys = loadPrimaryKeys(connection, schemaName, each); Map> uniqueKeys = loadUniqueIndexesOfTable(connection, schemaName, each); @@ -112,7 +112,7 @@ private Map loadTableMetaData0(final Connectio } Collection uniqueIndexMetaData = uniqueKeys.entrySet().stream() .map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList()); - result.put(new TableName(each), new PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData)); + result.put(new CaseInsensitiveIdentifier(each), new PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData)); } return result; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java index 9df6c93f2481c..8299f12e7ef3a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.common.util; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; @@ -48,16 +48,16 @@ public final class ShardingColumnsExtractor { * @param logicTableNames logic table names * @return sharding columns map */ - public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Set logicTableNames) { + public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Set logicTableNames) { Optional shardingRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs); if (!shardingRuleConfig.isPresent()) { return Collections.emptyMap(); } Set defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy()); Set defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy()); - Map> result = new ConcurrentHashMap<>(); + Map> result = new ConcurrentHashMap<>(); for (ShardingTableRuleConfiguration each : shardingRuleConfig.get().getTables()) { - LogicTableName logicTableName = new LogicTableName(each.getLogicTable()); + CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable()); if (!logicTableNames.contains(logicTableName)) { continue; } @@ -67,7 +67,7 @@ public Map> getShardingColumnsMap(final Collection tableNameMap; + private final Map tableNameMap; /** * Get logic table name. @@ -41,8 +40,8 @@ public final class ActualAndLogicTableNameMapper { * @param actualTableName actual table name * @return logic table name */ - public LogicTableName getLogicTableName(final String actualTableName) { - return tableNameMap.get(new ActualTableName(actualTableName)); + public CaseInsensitiveIdentifier getLogicTableName(final String actualTableName) { + return tableNameMap.get(new CaseInsensitiveIdentifier(actualTableName)); } /** @@ -52,6 +51,6 @@ public LogicTableName getLogicTableName(final String actualTableName) { * @return contains or not */ public boolean containsTable(final String actualTableName) { - return tableNameMap.containsKey(new ActualTableName(actualTableName)); + return tableNameMap.containsKey(new CaseInsensitiveIdentifier(actualTableName)); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java index bf7c9edd6ddc6..9c250533a54db 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import java.util.Collection; import java.util.Collections; @@ -33,7 +33,7 @@ @ToString public final class TableAndSchemaNameMapper { - private final Map mapping; + private final Map mapping; public TableAndSchemaNameMapper(final Map tableSchemaMap) { mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap); @@ -44,13 +44,13 @@ public TableAndSchemaNameMapper(final Collection tableNames) { mapping = getLogicTableNameMap(tableNameSchemaMap); } - private Map getLogicTableNameMap(final Map tableSchemaMap) { - Map result = new HashMap<>(tableSchemaMap.size(), 1F); + private Map getLogicTableNameMap(final Map tableSchemaMap) { + Map result = new HashMap<>(tableSchemaMap.size(), 1F); for (Entry entry : tableSchemaMap.entrySet()) { String tableName = entry.getKey(); String schemaName = entry.getValue(); if (null != schemaName) { - result.put(new LogicTableName(tableName), schemaName); + result.put(new CaseInsensitiveIdentifier(tableName), schemaName); } } return result; @@ -63,7 +63,7 @@ private Map getLogicTableNameMap(final Map tableNameMap = new LinkedHashMap<>(); - dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); + Map tableNameMap = new LinkedHashMap<>(); + dataNodeLine.getEntries() + .forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())))); return new IncrementalDumperContext( new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper), jobConfig.getJobId(), jobConfig.isDecodeWithTX()); @@ -295,8 +295,8 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig); JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - Map> shardingColumnsMap = new ShardingColumnsExtractor() - .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet())); + Map> shardingColumnsMap = new ShardingColumnsExtractor() + .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet())); return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 987620ee6303b..f58330485e655 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,14 +19,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration.CreateTableEntry; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; @@ -42,12 +37,16 @@ import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; @@ -56,6 +55,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; @@ -69,7 +70,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -265,8 +265,8 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); ImporterConfiguration importerConfig = buildImporterConfiguration( jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); @@ -296,7 +296,7 @@ sourceDataSourceConfig, new SchemaTableName(sourceSchemaName, dataNode.getTableN } private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index 88a49fba7a9c0..7908c1a1f2818 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,24 +17,24 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; +import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -192,7 +192,7 @@ private DataRecord getDataRecord(final String recordType) { } private ImporterConfiguration mockImporterConfiguration() { - Map> shardingColumnsMap = Collections.singletonMap(new LogicTableName("test_table"), Collections.singleton("user")); + Map> shardingColumnsMap = Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), Collections.singleton("user")); return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3); } }