Skip to content

Commit

Permalink
Refactor MigrationJobAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 30, 2023
1 parent 4facabd commit 3426243
Showing 1 changed file with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,18 @@
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {

private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
private final PipelineJobManager jobManager;

private final PipelineJobOption jobOption = new MigrationJobOption();
private final PipelineJobConfigurationManager jobConfigManager;

private final PipelineDataSourcePersistService dataSourcePersistService;

public MigrationJobAPI() {
PipelineJobOption jobOption = new MigrationJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourcePersistService = new PipelineDataSourcePersistService();
}

/**
* Start migration job.
Expand All @@ -106,7 +115,7 @@ public final class MigrationJobAPI implements TransmissionJobAPI {
*/
public String start(final PipelineContextKey contextKey, final MigrateTableStatement param) {
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
new PipelineJobManager(jobOption).start(jobConfig);
jobManager.start(jobConfig);
return jobConfig.getJobId();
}

Expand Down Expand Up @@ -269,10 +278,9 @@ private String getStandardProperty(final Map<String, Object> standardProps, fina
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
PipelineJobManager jobManager = new PipelineJobManager(jobOption);
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
Expand All @@ -290,7 +298,7 @@ public void rollback(final String jobId) throws SQLException {
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
new PipelineJobManager(jobOption).drop(jobId);
jobManager.drop(jobId);
log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
}

Expand All @@ -301,7 +309,7 @@ private void dropCheckJobs(final String jobId) {
}
for (String each : checkJobIds) {
try {
new PipelineJobManager(jobOption).drop(each);
jobManager.drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand Down

0 comments on commit 3426243

Please sign in to comment.