Skip to content

Commit

Permalink
Remove PipelineJobPreparer.checkSourceDataSource() and checkTargetDat…
Browse files Browse the repository at this point in the history
…aSource() (#29460)

* Remove PipelineJobPreparer.checkSourceDataSource()

* Remove PipelineJobPreparer.checkSourceDataSource()

* Remove PipelineJobPreparer.checkTargetDataSource()
  • Loading branch information
terrymanu authored Dec 19, 2023
1 parent 07d9b04 commit 2838da8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.shardingsphere.data.pipeline.core.checker;

import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;

/**
* Data source check engine.
Expand All @@ -45,6 +47,30 @@ public DataSourceCheckEngine(final DatabaseType databaseType) {
sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}

/**
* Check source data source.
*
* @param dataSource to be checked source data source
*/
public void checkSourceDataSource(final DataSource dataSource) {
Collection<DataSource> dataSources = Collections.singleton(dataSource);
checkConnection(dataSources);
checkPrivilege(dataSources);
checkVariable(dataSources);
}

/**
* Check target data source.
*
* @param dataSource to be checked target data source
* @param importerConfig importer configuration
*/
public void checkTargetDataSource(final DataSource dataSource, final ImporterConfiguration importerConfig) {
Collection<DataSource> dataSources = Collections.singleton(dataSource);
checkConnection(dataSources);
checkTargetTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
}

/**
* Check data source connections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
Expand All @@ -38,7 +36,6 @@

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;

/**
Expand All @@ -54,7 +51,7 @@ public final class PipelineJobPreparer {
* Get incremental position.
*
* @param initIncremental init incremental
* @param dumperContext dumper config
* @param dumperContext incremental dumper context
* @param dataSourceManager data source manager
* @return ingest position
* @throws SQLException SQL exception
Expand All @@ -71,37 +68,6 @@ public IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgre
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

/**
* Check data source.
*
* @param dataSources data source
*/
public void checkSourceDataSource(final Collection<? extends DataSource> dataSources) {
if (dataSources.isEmpty()) {
return;
}
DataSourceCheckEngine checkEngine = new DataSourceCheckEngine(databaseType);
checkEngine.checkConnection(dataSources);
checkEngine.checkPrivilege(dataSources);
checkEngine.checkVariable(dataSources);
}

/**
* Check target data source.
*
* @param importerConfig importer config
* @param targetDataSources target data sources
*/
public void checkTargetDataSource(final ImporterConfiguration importerConfig, final Collection<? extends DataSource> targetDataSources) {
if (null == targetDataSources || targetDataSources.isEmpty()) {
log.info("target data source is empty, skip check");
return;
}
DataSourceCheckEngine dataSourceCheckEngine = new DataSourceCheckEngine(databaseType);
dataSourceCheckEngine.checkConnection(targetDataSources);
dataSourceCheckEngine.checkTargetTable(targetDataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
}

/**
* Cleanup job preparer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}

private static TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
final PipelineDataSourceManager dataSourceManager,
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
try {
taskConfig.getDumperContext().getCommonContext().setPosition(new PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
Expand All @@ -34,6 +35,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
Expand Down Expand Up @@ -74,7 +76,6 @@

import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
Expand All @@ -100,7 +101,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
new PipelineJobPreparer(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
Expand Down Expand Up @@ -156,21 +157,19 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
}

private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
if (jobItemContext.isSourceTargetDatabaseTheSame()) {
prepareTarget(jobItemContext);
prepareTarget(jobItemContext, targetDatabaseType);
}
TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress();
if (null == initProgress) {
if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
new PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource(
jobItemContext.getTaskConfig().getImporterConfig(), Collections.singleton(targetDataSource));
new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource, jobItemContext.getTaskConfig().getImporterConfig());
}
}

private void prepareTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
private void prepareTarget(final MigrationJobItemContext jobItemContext, final DatabaseType targetDatabaseType) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, targetDatabaseType));
preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
Expand All @@ -184,8 +183,9 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
try {
taskConfig.getDumperContext().getCommonContext().setPosition(new PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
Expand Down

0 comments on commit 2838da8

Please sign in to comment.