diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java index d56e58ebc63e2..ee9c303d1aacf 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.generator; import com.google.common.base.Strings; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; @@ -32,6 +33,7 @@ import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.hint.HintValueContext; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment; @@ -55,9 +57,12 @@ /** * Pipeline DDL generator. */ +@RequiredArgsConstructor @Slf4j public final class PipelineDDLGenerator { + private final ShardingSphereMetaData metaData; + private static final String SET_SEARCH_PATH_PREFIX = "set search_path"; /** @@ -129,7 +134,7 @@ private String decorateActualSQL(final String databaseName, final String targetT } private SQLStatementContext parseSQL(final String currentDatabaseName, final SQLParserEngine parserEngine, final String sql) { - return new SQLBindEngine(null, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList()); + return new SQLBindEngine(metaData, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList()); } private void appendFromIndexAndConstraint(final Map replaceMap, final String targetTableName, final SQLStatementContext sqlStatementContext) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java index 20f9928dd7f40..2901c6164427e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java @@ -31,6 +31,7 @@ import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import javax.sql.DataSource; @@ -110,7 +111,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws final long startTimeMillis = System.currentTimeMillis(); PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { - List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine()); + List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), param.getMetaData()); try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { for (String sql : createTargetTableSQL) { executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql)); @@ -120,14 +121,14 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis); } - private List getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, - final PipelineDataSourceManager dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException { + private List getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager, + final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException { DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType(); DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig()); String schemaName = createTableConfig.getSourceName().getSchemaName(); String sourceTableName = createTableConfig.getSourceName().getTableName(); String targetTableName = createTableConfig.getTargetName().getTableName(); - return new PipelineDDLGenerator().generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine); + return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine); } private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java index 2648729988698..d9be8f69bdb2c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java @@ -20,6 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import java.util.Collection; @@ -36,4 +37,6 @@ public final class PrepareTargetTablesParameter { private final PipelineDataSourceManager dataSourceManager; private final SQLParserEngine sqlParserEngine; + + private final ShardingSphereMetaData metaData; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java index 7661cff53cd16..06d26a8cf5e54 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.junit.jupiter.api.Test; @@ -48,7 +49,7 @@ void assertPrepareTargetTables() { CreateTableConfiguration createTableConfig = mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS); when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType); PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter( - Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class)); + Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class)); assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index a8e7e6c85e628..110ff5ac616a8 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -161,7 +161,7 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(); SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class) .getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()); - preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine)); + preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, metaData)); } private void prepareIncremental(final MigrationJobItemContext jobItemContext) {