Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TransmissionJobAPI #29197

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @param processConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);

/**
* Build pipeline process context.
* Build transmission process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
* @param jobConfig pipeline job configuration
* @return transmission process context
*/
TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig);

/**
* Extend YAML job configuration.
Expand All @@ -78,13 +78,12 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
/**
* Build pipeline data consistency checker.
*
* @param pipelineJobConfig job configuration
* @param jobConfig job configuration
* @param processContext process context
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext);

/**
* Commit pipeline job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}

@Override
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper);
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
Expand Down Expand Up @@ -277,9 +277,9 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati
}

@Override
public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
return new CDCProcessContext(pipelineJobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}

@Override
Expand Down Expand Up @@ -325,8 +325,8 @@ public void rollback(final String jobId) throws SQLException {
}

@Override
public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
CDCProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ protected void runBlocking() {
TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
PipelineDataConsistencyChecker checker = jobAPI.buildDataConsistencyChecker(
parentJobConfig, jobAPI.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
MigrationProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() {
}

@Override
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
Expand All @@ -240,7 +240,7 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig = buildImporterConfiguration(
jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
MigrationTaskConfiguration result = new MigrationTaskConfiguration(
incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
Expand Down Expand Up @@ -275,15 +275,15 @@ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfi
}

@Override
public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig);
public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
return new MigrationProcessContext(jobConfig.getJobId(), processConfig);
}

@Override
public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, progressContext);
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ void assertDataConsistencyCheck() {
initTableData(jobConfig);
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobAPI.buildPipelineDataConsistencyChecker(
jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null);
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobAPI.buildDataConsistencyChecker(
jobConfig, jobAPI.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).isMatched());
Expand Down