From 50b94625c18d3cc254b3262907bb37b6ebf14843 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 4 Nov 2023 22:32:45 +0800 Subject: [PATCH] Add ActualAndLogicTableNameMapper (#28941) * Rename TableAndSchemaNameMapperTest * Move TableAndSchemaNameMapper * Add ActualAndLogicTableNameMapper * Add ActualAndLogicTableNameMapper * Add ActualAndLogicTableNameMapper * Add ActualAndLogicTableNameMapper * Refactor DumperCommonContext --- .../dumper/context/DumperCommonContext.java | 55 ++---------------- .../context/InventoryDumperContext.java | 2 +- .../mapper/ActualAndLogicTableNameMapper.java | 57 +++++++++++++++++++ .../mapper}/TableAndSchemaNameMapper.java | 2 +- .../mapper/TableAndSchemaNameMapperTest.java | 52 +++++++++++++++++ .../common/config/ImporterConfiguration.java | 2 +- .../pipeline/core/dumper/InventoryDumper.java | 11 ++-- .../InventoryRecordsCountCalculator.java | 3 +- .../core/preparer/InventoryTaskSplitter.java | 11 ++-- .../datasource/DataSourceCheckEngine.java | 2 +- .../datasource/DataSourceCheckEngineTest.java | 2 +- .../mysql/ingest/MySQLIncrementalDumper.java | 11 ++-- .../ingest/MySQLIncrementalDumperTest.java | 7 ++- .../ingest/wal/WALEventConverter.java | 9 +-- .../ingest/PostgreSQLWALDumperTest.java | 7 ++- .../ingest/wal/WALEventConverterTest.java | 7 ++- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 ++- .../migration/api/impl/MigrationJobAPI.java | 2 +- ...rationIncrementalDumperContextCreator.java | 5 +- .../importer/PipelineDataSourceSinkTest.java | 2 +- 20 files changed, 161 insertions(+), 95 deletions(-) create mode 100644 kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{context => ingest/dumper/context/mapper}/TableAndSchemaNameMapper.java (97%) create mode 100644 kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 4a640fdb7bf8b..9166f609df901 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -20,16 +20,13 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; - -import java.util.Map; /** - * Base dumper context. + * Dumper common context. */ @Getter @Setter @@ -40,53 +37,9 @@ public abstract class DumperCommonContext { private PipelineDataSourceConfiguration dataSourceConfig; - private Map tableNameMap; + private ActualAndLogicTableNameMapper tableNameMapper; private TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; - - /** - * Get logic table name. - * - * @param actualTableName actual table name - * @return logic table name - */ - public LogicTableName getLogicTableName(final String actualTableName) { - return tableNameMap.get(new ActualTableName(actualTableName)); - } - - private LogicTableName getLogicTableName(final ActualTableName actualTableName) { - return tableNameMap.get(actualTableName); - } - - /** - * Whether contains table. - * - * @param actualTableName actual table name - * @return contains or not - */ - public boolean containsTable(final String actualTableName) { - return tableNameMap.containsKey(new ActualTableName(actualTableName)); - } - - /** - * Get schema name. - * - * @param logicTableName logic table name - * @return schema name. nullable - */ - public String getSchemaName(final LogicTableName logicTableName) { - return tableAndSchemaNameMapper.getSchemaName(logicTableName); - } - - /** - * Get schema name. - * - * @param actualTableName actual table name - * @return schema name, can be nullable - */ - public String getSchemaName(final ActualTableName actualTableName) { - return tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName)); - } } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index d72f07c7eafa6..e96375d8c6ed0 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -54,7 +54,7 @@ public final class InventoryDumperContext extends DumperCommonContext { public InventoryDumperContext(final DumperCommonContext dumperContext) { setDataSourceName(dumperContext.getDataSourceName()); setDataSourceConfig(dumperContext.getDataSourceConfig()); - setTableNameMap(dumperContext.getTableNameMap()); + setTableNameMapper(dumperContext.getTableNameMapper()); setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper()); } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java new file mode 100644 index 0000000000000..475c1742d5f0a --- /dev/null +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; + +import java.util.Map; + +/** + * Actual table name and logic table name mapper. + */ +@RequiredArgsConstructor +@Getter +@ToString +public final class ActualAndLogicTableNameMapper { + + private final Map tableNameMap; + + /** + * Get logic table name. + * + * @param actualTableName actual table name + * @return logic table name + */ + public LogicTableName getLogicTableName(final String actualTableName) { + return tableNameMap.get(new ActualTableName(actualTableName)); + } + + /** + * Whether contains table. + * + * @param actualTableName actual table name + * @return contains or not + */ + public boolean containsTable(final String actualTableName) { + return tableNameMap.containsKey(new ActualTableName(actualTableName)); + } +} diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java similarity index 97% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java index 3999329ae631c..329a24d3a1ef1 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.context; +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java new file mode 100644 index 0000000000000..465ee71c7eac8 --- /dev/null +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNull; + +class TableAndSchemaNameMapperTest { + + @Test + void assertConstructFromNull() { + assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map) null)); + } + + @Test + void assertConstructFromValueNullMap() { + assertNull(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", null)).getSchemaName("t_order")); + } + + @Test + void assertConstructFromMap() { + assertThat(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public")); + } + + @Test + void assertConstructFromCollection() { + assertThat(new TableAndSchemaNameMapper(Arrays.asList("public.t_order", "t_order_item")).getSchemaName("t_order"), is("public")); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index 0116904291a85..1438d9420986b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index a8fb1156a7c58..b022f32de7227 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -21,17 +21,16 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -46,8 +45,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; -import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; @@ -102,7 +101,8 @@ protected void runBlocking() { log.info("Ignored because of already finished."); return; } - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName()); + PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData( + dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); try (Connection connection = dataSource.getConnection()) { dump(tableMetaData, connection); } catch (final SQLException ex) { @@ -156,8 +156,7 @@ private String buildInventoryDumpSQL() { if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { return dumperContext.getQuerySQL(); } - LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName()); - String schemaName = dumperContext.getSchemaName(logicTableName); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); if (!dumperContext.hasUniqueKey()) { return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java index 6656cdcf9a22e..f1e46de7beec7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java @@ -21,7 +21,6 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException; @@ -52,7 +51,7 @@ public final class InventoryRecordsCountCalculator { * @throws SplitPipelineJobByUniqueKeyException if there's exception from database */ public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType()); Optional sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 2f1c6954860ba..971593929183d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -20,19 +20,18 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Range; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; @@ -111,7 +110,7 @@ public Collection splitInventoryDumperContext(final Inve private Collection splitByTable(final InventoryDumperContext dumperContext) { Collection result = new LinkedList<>(); - dumperContext.getTableNameMap().forEach((key, value) -> { + dumperContext.getTableNameMapper().getTableNameMap().forEach((key, value) -> { InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext); // use original table name, for metadata loader, since some database table name case-sensitive inventoryDumperContext.setActualTableName(key.getOriginal()); @@ -127,7 +126,7 @@ private Collection splitByTable(final InventoryDumperCon private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (null == dumperContext.getUniqueKeyColumns()) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); @@ -205,7 +204,7 @@ private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( - dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey); + dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java index 7b76c512c8504..d801148321c24 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index d74115d295de6..0518f42811596 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 84cf56db027a3..6d50d4950a03f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -19,18 +19,18 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -132,7 +132,7 @@ private List handleEvent(final AbstractBinlogEvent event) { return Collections.singletonList(createPlaceholderRecord(event)); } AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; - if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) { + if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); @@ -155,7 +155,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { @@ -216,7 +217,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 25bbaa33ff13e..7d6ee11ad2faa 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -91,7 +92,7 @@ void setUp() throws SQLException { private IncrementalDumperContext createDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index adfdf05966693..cc2dca00d8678 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -80,7 +80,7 @@ public Record convert(final AbstractWALEvent event) { private boolean filter(final AbstractWALEvent event) { if (event instanceof AbstractRowEvent) { AbstractRowEvent rowEvent = (AbstractRowEvent) event; - return !dumperContext.containsTable(rowEvent.getTableName()); + return !dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName()); } return false; } @@ -90,7 +90,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { @@ -117,7 +118,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index e59c6a0f3539e..6cbb3233bac64 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; @@ -107,7 +108,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 93a4e72107496..15895cd370b3b 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; @@ -86,7 +87,7 @@ void setUp() throws SQLException { private IncrementalDumperContext mockDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index d6015da0a5ed0..85f2aea420630 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,13 +21,14 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; @@ -285,7 +286,7 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo result.setJobId(jobConfig.getJobId()); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); - result.setTableNameMap(tableNameMap); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 218d70427d9b2..0be975696eda6 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 06ee39a408b0c..659b15995eb2e 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,7 +18,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -52,7 +53,7 @@ private IncrementalDumperContext buildDumperContext(final String jobId, final St result.setJobId(jobId); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(sourceDataSource); - result.setTableNameMap(tableNameMap); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); return result; } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index 487df1e7d48c1..3fc0ace7fe6e0 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;