Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PipelineJobAPI.getJobConfiguration() #29059

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Get job configuration.
*
* @param jobId job id
* @return job configuration
*/
PipelineJobConfiguration getJobConfiguration(String jobId);

/**
* Get job configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat
return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
}

@Override
public CDCJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
Expand Down Expand Up @@ -78,7 +79,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
return jobAPI.getJobConfiguration(jobId).getDatabaseName();
return jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
}

/**
Expand Down Expand Up @@ -126,7 +127,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result
}

private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) {
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId);
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(",")));
Expand All @@ -329,11 +329,6 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo
}
}

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void runBlocking() {
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}
}

@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
Expand Down Expand Up @@ -328,7 +323,7 @@ private void dropCheckJobs(final String jobId) {
}

private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
Expand All @@ -352,7 +347,7 @@ public void commit(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void assertCreateJobConfig() {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(checkJobId);
ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void assertStartOrStopById() {
void assertRollback() throws SQLException {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand All @@ -154,7 +154,7 @@ void assertRollback() throws SQLException {
void assertCommit() {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand Down Expand Up @@ -277,7 +277,7 @@ void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db"));
MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId);
MigrationJobConfiguration actual = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));
Expand Down