Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pipeline e2e for MySQL #34108

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -74,15 +73,17 @@ public final class PipelineDDLGenerator {
* @param sourceTableName source table name
* @param targetTableName target table name
* @param parserEngine parser engine
* @param targetDatabaseName target database name
* @return DDL SQL
* @throws SQLException SQL exception
*/
public List<String> generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
final String schemaName, final String sourceTableName, final String targetTableName, final SQLParserEngine parserEngine) throws SQLException {
final String schemaName, final String sourceTableName, final String targetTableName,
final SQLParserEngine parserEngine, final String targetDatabaseName) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
List<String> result = new ArrayList<>();
for (String each : DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
Optional<String> queryContext = decorate(databaseType, targetDatabaseName, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(sql -> {
String trimmedSql = sql.trim();
if (!Strings.isNullOrEmpty(trimmedSql)) {
Expand All @@ -95,19 +96,15 @@ public List<String> generateLogicDDL(final DatabaseType databaseType, final Data
return result;
}

private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String targetTableName,
final SQLParserEngine parserEngine, final String sql) throws SQLException {
private Optional<String> decorate(final DatabaseType databaseType, final String targetDatabaseName, final String schemaName, final String targetTableName,
final SQLParserEngine parserEngine, final String sql) {
if (Strings.isNullOrEmpty(sql)) {
return Optional.empty();
}
String databaseName;
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
String result = decorateActualSQL(databaseName, targetTableName, parserEngine, sql.trim());
String result = decorateActualSQL(targetDatabaseName, targetTableName, parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result, parserEngine);
return decorateOpenGauss(targetDatabaseName, schemaName, result, parserEngine);
}
return Optional.of(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand All @@ -40,8 +41,10 @@
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

Expand All @@ -62,14 +65,16 @@ public final class PipelineJobDataSourcePreparer {
* Prepare target schemas.
*
* @param param prepare target schemas parameter
* @return target schemas
* @throws SQLException if prepare target schema fail
*/
public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
DatabaseType targetDatabaseType = param.getTargetDatabaseType();
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
if (!dialectDatabaseMetaData.isSchemaAvailable()) {
return;
return Collections.emptyMap();
}
Map<String, ShardingSphereMetaData> result = new HashMap<>(param.getCreateTableConfigurations().size(), 1F);
String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
Expand All @@ -80,25 +85,29 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro
}
Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> result.put(targetSchemaName, metaData));
createdSchemaNames.add(targetSchemaName);
}
}
return result;
}

private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
private Optional<ShardingSphereMetaData> executeCreateSchema(final PipelineDataSourceManager dataSourceManager,
final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
log.info("Prepare target schemas SQL: {}", sql);
try (
Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
return Optional.of(((ShardingSphereConnection) connection).getContextManager().getMetaDataContexts().getMetaData());
} catch (final SQLException ex) {
if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) {
throw ex;
}
log.warn("Create schema failed", ex);
}
return Optional.empty();
}

/**
Expand All @@ -111,8 +120,12 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
final long startTimeMillis = System.currentTimeMillis();
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), param.getMetaData());
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
ShardingSphereMetaData metaData = param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
if (null == metaData) {
metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData();
}
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName());
for (String sql : createTargetTableSQL) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
}
Expand All @@ -122,13 +135,13 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
}

private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager,
final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException {
final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData, final String targetDatabaseName) throws SQLException {
DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
String schemaName = createTableConfig.getSourceName().getSchemaName();
String sourceTableName = createTableConfig.getSourceName().getTableName();
String targetTableName = createTableConfig.getTargetName().getTableName();
return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine, targetDatabaseName);
}

private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.shardingsphere.infra.parser.SQLParserEngine;

import java.util.Collection;
import java.util.Map;

/**
* Prepare target tables parameter.
Expand All @@ -38,5 +39,7 @@ public final class PrepareTargetTablesParameter {

private final SQLParserEngine sqlParserEngine;

private final ShardingSphereMetaData metaData;
private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;

private final String targetDatabaseName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -45,11 +50,17 @@ void assertPrepareTargetSchemasWithSchemaNotAvailable() {
}

@Test
@SneakyThrows(SQLException.class)
void assertPrepareTargetTables() {
CreateTableConfiguration createTableConfig = mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS);
when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType);
PipelineDataSourceManager pipelineDataSourceManager = mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
ShardingSphereConnection connection = mock(ShardingSphereConnection.class, RETURNS_DEEP_STUBS);
when(pipelineDataSourceManager.getDataSource(any()).getConnection()).thenReturn(connection);
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter(
Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class));
Collections.singleton(createTableConfig), pipelineDataSourceManager,
mock(SQLParserEngine.class), mock(Map.class), "foo_db");
assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
Expand Down Expand Up @@ -77,6 +77,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

/**
* Migration job preparer.
Expand Down Expand Up @@ -157,11 +158,11 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D
Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
Map<String, ShardingSphereMetaData> targetSchemaMetaData = preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, metaData));
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
}

private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
Expand Down
Loading