diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java index 1ee726ba237b8..c3b8886940d26 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java @@ -33,8 +33,6 @@ import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; -import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; -import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; @@ -42,7 +40,6 @@ import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; -import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; @@ -54,7 +51,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; @@ -88,9 +84,6 @@ import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement; import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -297,40 +290,6 @@ public Optional getToBeStoppedPreviousJobType() { return Optional.of("CONSISTENCY_CHECK"); } - private void dropCheckJobs(final String jobId) { - Collection checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId); - if (checkJobIds.isEmpty()) { - return; - } - for (String each : checkJobIds) { - try { - new PipelineJobManager(this).drop(each); - // CHECKSTYLE:OFF - } catch (final RuntimeException ex) { - // CHECKSTYLE:ON - log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage()); - } - } - } - - private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); - PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); - TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); - try ( - PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget()); - Connection connection = dataSource.getConnection()) { - for (String each : jobConfig.getTargetTableNames()) { - String targetSchemaName = mapping.getSchemaName(each); - String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each); - log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql); - try (Statement statement = connection.createStatement()) { - statement.execute(sql); - } - } - } - } - /** * Add migration source resources. *