From fd683aaf9b803cf90348db46b5d9acd8591a588a Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 7 Nov 2023 22:51:54 +0800 Subject: [PATCH] Remove LogicTableName --- .../common/config/ImporterConfiguration.java | 10 +++---- .../datanode/JobDataNodeLineConvertUtils.java | 9 +++--- .../common/metadata/ActualTableName.java | 28 ------------------- .../metadata/CaseInsensitiveIdentifier.java | 2 +- .../common/metadata/LogicTableName.java | 28 ------------------- .../common/util/ShardingColumnsExtractor.java | 10 +++---- .../mapper/ActualAndLogicTableNameMapper.java | 11 ++++---- .../mapper/TableAndSchemaNameMapper.java | 14 +++++----- .../mysql/ingest/MySQLIncrementalDumper.java | 4 +-- .../ingest/MySQLIncrementalDumperTest.java | 23 ++++++++------- .../ingest/wal/WALEventConverter.java | 12 ++++---- .../ingest/PostgreSQLWALDumperTest.java | 13 ++++----- .../ingest/wal/WALEventConverterTest.java | 19 ++++++------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 26 ++++++++--------- .../migration/api/impl/MigrationJobAPI.java | 18 ++++++------ .../importer/PipelineDataSourceSinkTest.java | 16 +++++------ 16 files changed, 91 insertions(+), 152 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index 7b63bbca28ade..31720e9e204b0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,10 +20,10 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; @@ -45,7 +45,7 @@ public final class ImporterConfiguration { private final PipelineDataSourceConfiguration dataSourceConfig; // TODO columnName case-insensitive? - private final Map> shardingColumnsMap; + private final Map> shardingColumnsMap; private final TableAndSchemaNameMapper tableAndSchemaNameMapper; @@ -63,7 +63,7 @@ public final class ImporterConfiguration { * @return logic table names */ public Collection getLogicTableNames() { - return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList())); + return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(CaseInsensitiveIdentifier::toString).collect(Collectors.toList())); } /** @@ -73,7 +73,7 @@ public Collection getLogicTableNames() { * @return sharding columns */ public Set getShardingColumns(final String logicTableName) { - return shardingColumnsMap.getOrDefault(new LogicTableName(logicTableName), Collections.emptySet()); + return shardingColumnsMap.getOrDefault(new CaseInsensitiveIdentifier(logicTableName), Collections.emptySet()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java index 997f38aa8e92d..26e3ef3df336a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java @@ -20,8 +20,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.datanode.DataNode; import java.util.LinkedHashMap; @@ -74,11 +73,11 @@ private static Map>> groupDataSourceDataNodes * @param dataNodeLine data node line * @return actual table and logic table map */ - public static Map buildTableNameMap(final JobDataNodeLine dataNodeLine) { - Map result = new LinkedHashMap<>(); + public static Map buildTableNameMap(final JobDataNodeLine dataNodeLine) { + Map result = new LinkedHashMap<>(); for (JobDataNodeEntry each : dataNodeLine.getEntries()) { for (DataNode dataNode : each.getDataNodes()) { - result.put(new ActualTableName(dataNode.getTableName()), new LogicTableName(each.getLogicTableName())); + result.put(new CaseInsensitiveIdentifier(dataNode.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())); } } return result; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java deleted file mode 100644 index 0c4837f0a4e25..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -/** - * Actual table name. - */ -public final class ActualTableName extends CaseInsensitiveIdentifier { - - public ActualTableName(final String tableName) { - super(tableName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java index 3955c6864b01f..feb48a5848343 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java @@ -24,7 +24,7 @@ */ // TODO table name case-sensitive for some database @EqualsAndHashCode(of = "lowercase") -public class CaseInsensitiveIdentifier { +public final class CaseInsensitiveIdentifier { private final String original; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java deleted file mode 100644 index b989a178a58d5..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.metadata; - -/** - * Logic table name. - */ -public final class LogicTableName extends CaseInsensitiveIdentifier { - - public LogicTableName(final String tableName) { - super(tableName); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java index 9df6c93f2481c..8299f12e7ef3a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.common.util; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; @@ -48,16 +48,16 @@ public final class ShardingColumnsExtractor { * @param logicTableNames logic table names * @return sharding columns map */ - public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Set logicTableNames) { + public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Set logicTableNames) { Optional shardingRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs); if (!shardingRuleConfig.isPresent()) { return Collections.emptyMap(); } Set defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy()); Set defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy()); - Map> result = new ConcurrentHashMap<>(); + Map> result = new ConcurrentHashMap<>(); for (ShardingTableRuleConfiguration each : shardingRuleConfig.get().getTables()) { - LogicTableName logicTableName = new LogicTableName(each.getLogicTable()); + CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable()); if (!logicTableNames.contains(logicTableName)) { continue; } @@ -67,7 +67,7 @@ public Map> getShardingColumnsMap(final Collection tableNameMap; + private final Map tableNameMap; /** * Get logic table name. @@ -41,8 +40,8 @@ public final class ActualAndLogicTableNameMapper { * @param actualTableName actual table name * @return logic table name */ - public LogicTableName getLogicTableName(final String actualTableName) { - return tableNameMap.get(new ActualTableName(actualTableName)); + public CaseInsensitiveIdentifier getLogicTableName(final String actualTableName) { + return tableNameMap.get(new CaseInsensitiveIdentifier(actualTableName)); } /** @@ -52,6 +51,6 @@ public LogicTableName getLogicTableName(final String actualTableName) { * @return contains or not */ public boolean containsTable(final String actualTableName) { - return tableNameMap.containsKey(new ActualTableName(actualTableName)); + return tableNameMap.containsKey(new CaseInsensitiveIdentifier(actualTableName)); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java index bf7c9edd6ddc6..9c250533a54db 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import java.util.Collection; import java.util.Collections; @@ -33,7 +33,7 @@ @ToString public final class TableAndSchemaNameMapper { - private final Map mapping; + private final Map mapping; public TableAndSchemaNameMapper(final Map tableSchemaMap) { mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap); @@ -44,13 +44,13 @@ public TableAndSchemaNameMapper(final Collection tableNames) { mapping = getLogicTableNameMap(tableNameSchemaMap); } - private Map getLogicTableNameMap(final Map tableSchemaMap) { - Map result = new HashMap<>(tableSchemaMap.size(), 1F); + private Map getLogicTableNameMap(final Map tableSchemaMap) { + Map result = new HashMap<>(tableSchemaMap.size(), 1F); for (Entry entry : tableSchemaMap.entrySet()) { String tableName = entry.getKey(); String schemaName = entry.getValue(); if (null != schemaName) { - result.put(new LogicTableName(tableName), schemaName); + result.put(new CaseInsensitiveIdentifier(tableName), schemaName); } } return result; @@ -63,7 +63,7 @@ private Map getLogicTableNameMap(final Map tableNameMap = new LinkedHashMap<>(); - dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); + Map tableNameMap = new LinkedHashMap<>(); + dataNodeLine.getEntries() + .forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())))); return new IncrementalDumperContext( new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper), jobConfig.getJobId(), jobConfig.isDecodeWithTX()); @@ -295,8 +295,8 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig); JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - Map> shardingColumnsMap = new ShardingColumnsExtractor() - .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet())); + Map> shardingColumnsMap = new ShardingColumnsExtractor() + .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet())); return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 987620ee6303b..f58330485e655 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,14 +19,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration.CreateTableEntry; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; @@ -42,12 +37,16 @@ import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; @@ -56,6 +55,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; @@ -69,7 +70,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -265,8 +265,8 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); ImporterConfiguration importerConfig = buildImporterConfiguration( jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); @@ -296,7 +296,7 @@ sourceDataSourceConfig, new SchemaTableName(sourceSchemaName, dataNode.getTableN } private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index 88a49fba7a9c0..7908c1a1f2818 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,24 +17,24 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; -import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; +import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -192,7 +192,7 @@ private DataRecord getDataRecord(final String recordType) { } private ImporterConfiguration mockImporterConfiguration() { - Map> shardingColumnsMap = Collections.singletonMap(new LogicTableName("test_table"), Collections.singleton("user")); + Map> shardingColumnsMap = Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), Collections.singleton("user")); return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3); } }