From f246711e6c21a973a70574031016dccad45f5cdc Mon Sep 17 00:00:00 2001 From: Haoran Meng Date: Mon, 23 Dec 2024 13:04:58 +0800 Subject: [PATCH] Fix pipeline e2e for PostgresSQL (#34125) * Fix pipeline e2e for PostgresSQL * Fix checkstyle --- .../generator/PipelineDDLDecorator.java | 177 ++++++++++++++++++ .../generator/PipelineDDLGenerator.java | 156 +-------------- .../PipelineJobDataSourcePreparer.java | 44 ++--- .../param/PrepareTargetTablesParameter.java | 4 - .../PipelineJobDataSourcePreparerTest.java | 3 +- .../preparer/MigrationJobPreparer.java | 7 +- 6 files changed, 208 insertions(+), 183 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java new file mode 100644 index 0000000000000..d66f840447c67 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.metadata.generator; + +import com.google.common.base.Strings; +import lombok.AllArgsConstructor; +import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; +import org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext; +import org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext; +import org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateIndexStatementContext; +import org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateTableStatementContext; +import org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable; +import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable; +import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; +import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.hint.HintValueContext; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; +import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils; +import org.apache.shardingsphere.infra.parser.SQLParserEngine; +import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment; +import org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.constraint.ConstraintSegment; +import org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.IndexSegment; +import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment; +import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.TreeMap; + +/** + * Pipeline DDL decorator. + */ +@AllArgsConstructor +public final class PipelineDDLDecorator { + + private static final String SET_SEARCH_PATH_PREFIX = "set search_path"; + + private final ShardingSphereMetaData metaData; + + /** + * Decorate SQL. + * + * @param databaseType database type + * @param targetDatabaseName target database name + * @param schemaName schema name + * @param targetTableName target table name + * @param parserEngine parser engine + * @param sql SQL + * @return decorated SQL + */ + public Optional decorate(final DatabaseType databaseType, final String targetDatabaseName, final String schemaName, final String targetTableName, + final SQLParserEngine parserEngine, final String sql) { + if (Strings.isNullOrEmpty(sql)) { + return Optional.empty(); + } + String result = decorateActualSQL(targetDatabaseName, targetTableName, parserEngine, sql.trim()); + // TODO remove it after set search_path is supported. + if ("openGauss".equals(databaseType.getType())) { + return decorateOpenGauss(targetDatabaseName, schemaName, result, parserEngine); + } + return Optional.of(result); + } + + private String decorateActualSQL(final String databaseName, final String targetTableName, final SQLParserEngine parserEngine, final String sql) { + SQLStatementContext sqlStatementContext = parseSQL(databaseName, parserEngine, sql); + Map replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex)); + if (sqlStatementContext instanceof CreateTableStatementContext) { + appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); + appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); + } + if (sqlStatementContext instanceof CommentStatementContext) { + appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); + } + if (sqlStatementContext instanceof CreateIndexStatementContext) { + appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); + appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); + } + if (sqlStatementContext instanceof AlterTableStatementContext) { + appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); + appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); + } + return doDecorateActualTable(replaceMap, sql); + } + + private SQLStatementContext parseSQL(final String currentDatabaseName, final SQLParserEngine parserEngine, final String sql) { + return new SQLBindEngine(metaData, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList()); + } + + private void appendFromIndexAndConstraint(final Map replaceMap, final String targetTableName, final SQLStatementContext sqlStatementContext) { + if (!(sqlStatementContext instanceof TableAvailable) || ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) { + return; + } + TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName(); + if (!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) { + if (sqlStatementContext instanceof IndexAvailable) { + for (IndexSegment each : ((IndexAvailable) sqlStatementContext).getIndexes()) { + String logicIndexName = IndexMetaDataUtils.getLogicIndexName(each.getIndexName().getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue()); + replaceMap.put(each.getIndexName(), logicIndexName); + } + } + if (sqlStatementContext instanceof ConstraintAvailable) { + for (ConstraintSegment each : ((ConstraintAvailable) sqlStatementContext).getConstraints()) { + String logicConstraint = IndexMetaDataUtils.getLogicIndexName(each.getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue()); + replaceMap.put(each, logicConstraint); + } + } + } + } + + private void appendFromTable(final Map replaceMap, final String targetTableName, final TableAvailable sqlStatementContext) { + for (SimpleTableSegment each : sqlStatementContext.getTablesContext().getSimpleTables()) { + if (!targetTableName.equals(each.getTableName().getIdentifier().getValue())) { + replaceMap.put(each.getTableName(), targetTableName); + } + } + } + + private String doDecorateActualTable(final Map replaceMap, final String sql) { + StringBuilder result = new StringBuilder(); + int lastStopIndex = 0; + for (Entry entry : replaceMap.entrySet()) { + result.append(sql, lastStopIndex, entry.getKey().getStartIndex()); + result.append(entry.getValue()); + lastStopIndex = entry.getKey().getStopIndex() + 1; + } + if (lastStopIndex < sql.length()) { + result.append(sql, lastStopIndex, sql.length()); + } + return result.toString(); + } + + // TODO remove it after set search_path is supported. + private Optional decorateOpenGauss(final String databaseName, final String schemaName, final String queryContext, + final SQLParserEngine parserEngine) { + if (queryContext.toLowerCase().startsWith(SET_SEARCH_PATH_PREFIX)) { + return Optional.empty(); + } + return Optional.of(replaceTableNameWithPrefix(queryContext, schemaName + ".", databaseName, parserEngine)); + } + + private String replaceTableNameWithPrefix(final String sql, final String prefix, final String databaseName, final SQLParserEngine parserEngine) { + SQLStatementContext sqlStatementContext = parseSQL(databaseName, parserEngine, sql); + if (sqlStatementContext instanceof CreateTableStatementContext || sqlStatementContext instanceof CommentStatementContext + || sqlStatementContext instanceof CreateIndexStatementContext || sqlStatementContext instanceof AlterTableStatementContext) { + if (((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) { + return sql; + } + if (((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName().isPresent()) { + return sql; + } + Map replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex)); + TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName(); + replaceMap.put(tableNameSegment, prefix + tableNameSegment.getIdentifier().getValue()); + return doDecorateActualTable(replaceMap, sql); + } + return sql; + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java index 95b8b3b85ef61..37d281e4da6a7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java @@ -17,41 +17,16 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.generator; -import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; -import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; -import org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext; -import org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext; -import org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateIndexStatementContext; -import org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateTableStatementContext; -import org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable; -import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable; -import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; -import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.hint.HintValueContext; -import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; -import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils; -import org.apache.shardingsphere.infra.parser.SQLParserEngine; -import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment; -import org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.constraint.ConstraintSegment; -import org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.IndexSegment; -import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment; -import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment; import javax.sql.DataSource; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.TreeMap; /** * Pipeline DDL generator. @@ -60,10 +35,6 @@ @Slf4j public final class PipelineDDLGenerator { - private static final String SET_SEARCH_PATH_PREFIX = "set search_path"; - - private final ShardingSphereMetaData metaData; - /** * Generate logic DDL. * @@ -72,135 +43,16 @@ public final class PipelineDDLGenerator { * @param schemaName schema name * @param sourceTableName source table name * @param targetTableName target table name - * @param parserEngine parser engine - * @param targetDatabaseName target database name * @return DDL SQL * @throws SQLException SQL exception */ - public List generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource, - final String schemaName, final String sourceTableName, final String targetTableName, - final SQLParserEngine parserEngine, final String targetDatabaseName) throws SQLException { + public static List generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource, + final String schemaName, final String sourceTableName, final String targetTableName) throws SQLException { long startTimeMillis = System.currentTimeMillis(); - List result = new ArrayList<>(); - for (String each : DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, sourceTableName)) { - Optional queryContext = decorate(databaseType, targetDatabaseName, schemaName, targetTableName, parserEngine, each); - queryContext.ifPresent(sql -> { - String trimmedSql = sql.trim(); - if (!Strings.isNullOrEmpty(trimmedSql)) { - result.add(trimmedSql); - } - }); - } + List result = new ArrayList<>(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType) + .buildCreateTableSQLs(sourceDataSource, schemaName, sourceTableName)); log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}, cost {} ms", databaseType.getType(), schemaName, sourceTableName, targetTableName, System.currentTimeMillis() - startTimeMillis); return result; } - - private Optional decorate(final DatabaseType databaseType, final String targetDatabaseName, final String schemaName, final String targetTableName, - final SQLParserEngine parserEngine, final String sql) { - if (Strings.isNullOrEmpty(sql)) { - return Optional.empty(); - } - String result = decorateActualSQL(targetDatabaseName, targetTableName, parserEngine, sql.trim()); - // TODO remove it after set search_path is supported. - if ("openGauss".equals(databaseType.getType())) { - return decorateOpenGauss(targetDatabaseName, schemaName, result, parserEngine); - } - return Optional.of(result); - } - - private String decorateActualSQL(final String databaseName, final String targetTableName, final SQLParserEngine parserEngine, final String sql) { - SQLStatementContext sqlStatementContext = parseSQL(databaseName, parserEngine, sql); - Map replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex)); - if (sqlStatementContext instanceof CreateTableStatementContext) { - appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); - appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); - } - if (sqlStatementContext instanceof CommentStatementContext) { - appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); - } - if (sqlStatementContext instanceof CreateIndexStatementContext) { - appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); - appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); - } - if (sqlStatementContext instanceof AlterTableStatementContext) { - appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext); - appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext); - } - return doDecorateActualTable(replaceMap, sql); - } - - private SQLStatementContext parseSQL(final String currentDatabaseName, final SQLParserEngine parserEngine, final String sql) { - return new SQLBindEngine(metaData, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList()); - } - - private void appendFromIndexAndConstraint(final Map replaceMap, final String targetTableName, final SQLStatementContext sqlStatementContext) { - if (!(sqlStatementContext instanceof TableAvailable) || ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) { - return; - } - TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName(); - if (!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) { - if (sqlStatementContext instanceof IndexAvailable) { - for (IndexSegment each : ((IndexAvailable) sqlStatementContext).getIndexes()) { - String logicIndexName = IndexMetaDataUtils.getLogicIndexName(each.getIndexName().getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue()); - replaceMap.put(each.getIndexName(), logicIndexName); - } - } - if (sqlStatementContext instanceof ConstraintAvailable) { - for (ConstraintSegment each : ((ConstraintAvailable) sqlStatementContext).getConstraints()) { - String logicConstraint = IndexMetaDataUtils.getLogicIndexName(each.getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue()); - replaceMap.put(each, logicConstraint); - } - } - } - } - - private void appendFromTable(final Map replaceMap, final String targetTableName, final TableAvailable sqlStatementContext) { - for (SimpleTableSegment each : sqlStatementContext.getTablesContext().getSimpleTables()) { - if (!targetTableName.equals(each.getTableName().getIdentifier().getValue())) { - replaceMap.put(each.getTableName(), targetTableName); - } - } - } - - private String doDecorateActualTable(final Map replaceMap, final String sql) { - StringBuilder result = new StringBuilder(); - int lastStopIndex = 0; - for (Entry entry : replaceMap.entrySet()) { - result.append(sql, lastStopIndex, entry.getKey().getStartIndex()); - result.append(entry.getValue()); - lastStopIndex = entry.getKey().getStopIndex() + 1; - } - if (lastStopIndex < sql.length()) { - result.append(sql, lastStopIndex, sql.length()); - } - return result.toString(); - } - - // TODO remove it after set search_path is supported. - private Optional decorateOpenGauss(final String databaseName, final String schemaName, final String queryContext, - final SQLParserEngine parserEngine) { - if (queryContext.toLowerCase().startsWith(SET_SEARCH_PATH_PREFIX)) { - return Optional.empty(); - } - return Optional.of(replaceTableNameWithPrefix(queryContext, schemaName + ".", databaseName, parserEngine)); - } - - private String replaceTableNameWithPrefix(final String sql, final String prefix, final String databaseName, final SQLParserEngine parserEngine) { - SQLStatementContext sqlStatementContext = parseSQL(databaseName, parserEngine, sql); - if (sqlStatementContext instanceof CreateTableStatementContext || sqlStatementContext instanceof CommentStatementContext - || sqlStatementContext instanceof CreateIndexStatementContext || sqlStatementContext instanceof AlterTableStatementContext) { - if (((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) { - return sql; - } - if (((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName().isPresent()) { - return sql; - } - Map replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex)); - TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName(); - replaceMap.put(tableNameSegment, prefix + tableNameSegment.getIdentifier().getValue()); - return doDecorateActualTable(replaceMap, sql); - } - return sql; - } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java index c256c37ae2a14..9d127680312e9 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java @@ -17,10 +17,12 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLDecorator; import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; @@ -41,10 +43,8 @@ import java.sql.Statement; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; @@ -65,16 +65,14 @@ public final class PipelineJobDataSourcePreparer { * Prepare target schemas. * * @param param prepare target schemas parameter - * @return target schemas * @throws SQLException if prepare target schema fail */ - public Map prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException { + public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException { DatabaseType targetDatabaseType = param.getTargetDatabaseType(); DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData(); if (!dialectDatabaseMetaData.isSchemaAvailable()) { - return Collections.emptyMap(); + return; } - Map result = new HashMap<>(param.getCreateTableConfigurations().size(), 1F); String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null); PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType); Collection createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F); @@ -85,21 +83,19 @@ public Map prepareTargetSchemas(final PrepareTar } Optional sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName); if (sql.isPresent()) { - executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> result.put(targetSchemaName, metaData)); + executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()); createdSchemaNames.add(targetSchemaName); } } - return result; } - private Optional executeCreateSchema(final PipelineDataSourceManager dataSourceManager, - final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException { + private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, + final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException { log.info("Prepare target schemas SQL: {}", sql); try ( Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection(); Statement statement = connection.createStatement()) { statement.execute(sql); - return Optional.of(((ShardingSphereConnection) connection).getContextManager().getMetaDataContexts().getMetaData()); } catch (final SQLException ex) { if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType) .map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) { @@ -107,7 +103,6 @@ private Optional executeCreateSchema(final PipelineDataS } log.warn("Create schema failed", ex); } - return Optional.empty(); } /** @@ -121,27 +116,34 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { - ShardingSphereMetaData metaData = param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName()); - if (null == metaData) { - metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData(); - } - List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName()); + List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager); for (String sql : createTargetTableSQL) { - executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql)); + ShardingSphereMetaData metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData(); + Optional decoratedSQL = decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName(), sql); + if (decoratedSQL.isPresent()) { + executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(decoratedSQL.get())); + } } } } log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis); } - private List getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager, - final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData, final String targetDatabaseName) throws SQLException { + private List getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager) throws SQLException { DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType(); DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig()); String schemaName = createTableConfig.getSourceName().getSchemaName(); String sourceTableName = createTableConfig.getSourceName().getTableName(); String targetTableName = createTableConfig.getTargetName().getTableName(); - return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine, targetDatabaseName); + return PipelineDDLGenerator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName); + } + + private Optional decorateTargetTableSQL(final CreateTableConfiguration createTableConfig, final SQLParserEngine sqlParserEngine, + final ShardingSphereMetaData metaData, final String targetDatabaseName, final String sql) { + String schemaName = createTableConfig.getSourceName().getSchemaName(); + String targetTableName = createTableConfig.getTargetName().getTableName(); + Optional decoratedSQL = new PipelineDDLDecorator(metaData).decorate(databaseType, targetDatabaseName, schemaName, targetTableName, sqlParserEngine, sql); + return decoratedSQL.map(String::trim).filter(trimmedSql -> !Strings.isNullOrEmpty(trimmedSql)); } private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java index f14bb707ff0d7..e0c6cdccfd327 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java @@ -20,11 +20,9 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; -import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import java.util.Collection; -import java.util.Map; /** * Prepare target tables parameter. @@ -39,7 +37,5 @@ public final class PrepareTargetTablesParameter { private final SQLParserEngine sqlParserEngine; - private final Map targetSchemaMetaData; - private final String targetDatabaseName; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java index 73013470e88e8..6eec79b803b78 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java @@ -31,7 +31,6 @@ import java.sql.SQLException; import java.util.Collections; -import java.util.Map; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; @@ -60,7 +59,7 @@ void assertPrepareTargetTables() { when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class)); PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter( Collections.singleton(createTableConfig), pipelineDataSourceManager, - mock(SQLParserEngine.class), mock(Map.class), "foo_db"); + mock(SQLParserEngine.class), "foo_db"); assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index e8d29bf89e2ce..6f88bce5f3ff4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -65,19 +65,18 @@ import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException; -import org.apache.shardingsphere.mode.lock.global.GlobalLockNames; import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.lock.LockDefinition; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; +import org.apache.shardingsphere.mode.lock.global.GlobalLockNames; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.parser.rule.SQLParserRule; import java.sql.SQLException; import java.util.Collection; import java.util.Collections; -import java.util.Map; /** * Migration job preparer. @@ -157,11 +156,11 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D Collection createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations(); PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager(); PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType); - Map targetSchemaMetaData = preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager)); + preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager)); ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(); SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class) .getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()); - preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName())); + preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, jobConfig.getTargetDatabaseName())); } private void prepareIncremental(final MigrationJobItemContext jobItemContext) {