Skip to content

Commit

Permalink
Pass metadata to PipelineDDLGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
strongduanmu committed Dec 17, 2024
1 parent dab374c commit 77457cb
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.generator;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
Expand All @@ -32,6 +33,7 @@
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment;
Expand All @@ -55,9 +57,12 @@
/**
* Pipeline DDL generator.
*/
@RequiredArgsConstructor
@Slf4j
public final class PipelineDDLGenerator {

private final ShardingSphereMetaData metaData;

private static final String SET_SEARCH_PATH_PREFIX = "set search_path";

/**
Expand Down Expand Up @@ -129,7 +134,7 @@ private String decorateActualSQL(final String databaseName, final String targetT
}

private SQLStatementContext parseSQL(final String currentDatabaseName, final SQLParserEngine parserEngine, final String sql) {
return new SQLBindEngine(null, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList());
return new SQLBindEngine(metaData, currentDatabaseName, new HintValueContext()).bind(parserEngine.parse(sql, true), Collections.emptyList());
}

private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String targetTableName, final SQLStatementContext sqlStatementContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;

import javax.sql.DataSource;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
final long startTimeMillis = System.currentTimeMillis();
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), param.getMetaData());
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
for (String sql : createTargetTableSQL) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
Expand All @@ -120,14 +121,14 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig,
final PipelineDataSourceManager dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException {
private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager,
final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException {
DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
String schemaName = createTableConfig.getSourceName().getSchemaName();
String sourceTableName = createTableConfig.getSourceName().getTableName();
String targetTableName = createTableConfig.getTargetName().getTableName();
return new PipelineDDLGenerator().generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
}

private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;

import java.util.Collection;
Expand All @@ -36,4 +37,6 @@ public final class PrepareTargetTablesParameter {
private final PipelineDataSourceManager dataSourceManager;

private final SQLParserEngine sqlParserEngine;

private final ShardingSphereMetaData metaData;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
Expand All @@ -48,7 +49,7 @@ void assertPrepareTargetTables() {
CreateTableConfiguration createTableConfig = mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS);
when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType);
PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter(
Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class));
Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class));
assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine));
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, metaData));
}

private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
Expand Down

0 comments on commit 77457cb

Please sign in to comment.