Skip to content

Commit

Permalink
Refactor PipelineJobAPI.getJobConfiguration()
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 16, 2023
1 parent 1198efc commit 80ea4ca
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Get job configuration.
*
* @param jobId job id
* @return job configuration
*/
PipelineJobConfiguration getJobConfiguration(String jobId);

/**
* Get job configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat
return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
}

@Override
public CDCJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
Expand Down Expand Up @@ -78,7 +79,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
return jobAPI.getJobConfiguration(jobId).getDatabaseName();
return jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
}

/**
Expand Down Expand Up @@ -126,7 +127,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result
}

private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) {
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId);
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(",")));
Expand All @@ -329,11 +329,6 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo
}
}

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void runBlocking() {
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}
}

@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down Expand Up @@ -328,7 +323,7 @@ private void dropCheckJobs(final String jobId) {
}

private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
Expand All @@ -352,7 +347,7 @@ public void commit(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void assertCreateJobConfig() {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(checkJobId);
ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void assertStartOrStopById() {
void assertRollback() throws SQLException {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand All @@ -154,7 +154,7 @@ void assertRollback() throws SQLException {
void assertCommit() {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand Down Expand Up @@ -277,7 +277,7 @@ void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db"));
MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId);
MigrationJobConfiguration actual = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));
Expand Down

0 comments on commit 80ea4ca

Please sign in to comment.