diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index e31ee4796b233..764866c717e7a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -90,9 +90,8 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte */ public List getJobItemInfos(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(jobAPI); - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO); - long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0")); + PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId); + long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); Map jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig); List result = new LinkedList<>(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 0e8d2a2d6826f..b01ce97c1f25c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -57,13 +57,13 @@ public final class PipelineJobManager { /** * Get job configuration. * - * @param jobConfigPOJO job configuration POJO + * @param jobId job ID * @param type of pipeline job configuration * @return pipeline job configuration */ @SuppressWarnings("unchecked") - public T getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + public T getJobConfiguration(final String jobId) { + return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); } /** diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index f59ed72f6a6f1..1bf18932eccfe 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; @@ -49,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 6f711ab199211..0e41d2f8cac04 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -289,9 +289,8 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { @Override public TableBasedPipelineJobInfo getJobInfo(final String jobId) { - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO); - CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); + PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } @@ -305,9 +304,8 @@ public void commit(final String jobId) { * @param jobId job id */ public void dropStreaming(final String jobId) { - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); - ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); + CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); + ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); new PipelineJobManager(this).drop(jobId); cleanup(jobConfig); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index 1f1a9473cd42e..05070a7a901ef 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -47,7 +47,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; @@ -82,7 +81,7 @@ public final class CDCBackendHandler { * @return database */ public String getDatabaseNameByJobId(final String jobId) { - return jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName(); + return jobManager.getJobConfiguration(jobId).getDatabaseName(); } /** @@ -130,7 +129,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod * @param connectionContext connection context */ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { - CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(jobId); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); if (PipelineJobCenter.isJobExisting(jobId)) { PipelineJobCenter.stop(jobId); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 51f8e5edc113f..63e6ff424362a 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -268,7 +268,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { - ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(checkJobId); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 8259022cf89ce..136e92b3dc5bf 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -103,7 +103,7 @@ protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); - PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId)); + PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId); try { PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker( parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index c5d551a55ce89..6ffa6371715b4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -67,7 +67,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties; import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser; @@ -211,11 +210,10 @@ private Map buildTargetTableSchemaMap(final Map sourceTables = new LinkedList<>(); - new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes() - .forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); + new PipelineJobManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() + .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables)); } @@ -324,7 +322,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -348,7 +346,7 @@ public void commit(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 7762178b079e7..fe9ea2db83c66 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -69,7 +69,7 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 3e67089680ae4..84a4e55572e71 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -150,7 +150,7 @@ void assertStartOrStopById() { void assertRollback() throws SQLException { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get()); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -162,7 +162,7 @@ void assertRollback() throws SQLException { void assertCommit() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get()); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -285,7 +285,7 @@ void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); - MigrationJobConfiguration actual = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration actual = jobManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); assertThat(dataNodeLines.size(), is(1));