diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java index 19d71a8d79113..278c99744b566 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java @@ -106,12 +106,12 @@ public static String getJobOffsetPath(final String jobId) { } /** - * Get job config path. + * Get job configuration path. * * @param jobId job id * @return job configuration path */ - public static String getJobConfigPath(final String jobId) { + public static String getJobConfigurationPath(final String jobId) { return String.join("/", getJobRootPath(jobId), "config"); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java index 2a9e0447fbba2..83904a361a910 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java @@ -32,12 +32,12 @@ public interface GovernanceRepositoryAPI { /** - * Whether key existing or not. + * Whether job configuration existed. * - * @param key registry center key - * @return true if job exists, else false + * @param jobId jobId + * @return job configuration exist or not */ - boolean isExisted(String key); + boolean isJobConfigurationExisted(String jobId); /** * Persist job offset info. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java index 9071709e74551..9e12ee8d9a6da 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java @@ -51,9 +51,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP private final ClusterPersistRepository repository; @Override - public boolean isExisted(final String key) { - // TODO delegate to repository isExisted - return null != repository.getDirectly(key); + public boolean isJobConfigurationExisted(final String jobId) { + return null != repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId)); } @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index b01ce97c1f25c..e8e1c82148112 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -76,13 +76,12 @@ public Optional start(final PipelineJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId)); GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)); - String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId); - if (repositoryAPI.isExisted(jobConfigKey)) { - log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); + if (repositoryAPI.isJobConfigurationExisted(jobId)) { + log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); return Optional.of(jobId); } repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName()); - repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); + repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); return Optional.of(jobId); } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java index d33cf56282cc5..0dbbc688841db 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java @@ -67,7 +67,7 @@ void assertGetJobOffsetItemPath() { @Test void assertGetJobConfigPath() { - assertThat(PipelineMetaDataNode.getJobConfigPath(jobId), is(jobRootPath + "/config")); + assertThat(PipelineMetaDataNode.getJobConfigurationPath(jobId), is(jobRootPath + "/config")); } @Test diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 405d0f5a5ad84..17c3daa7a1fbf 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -122,14 +122,13 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId())); GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); - String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId()); - if (repositoryAPI.isExisted(jobConfigKey)) { - log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); + if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) { + log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName()); JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); - repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO)); + repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO)); if (!param.isFull()) { initIncrementalPosition(jobConfig); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index 60e31f0811c44..9c5bca8cf1d9d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -80,11 +80,10 @@ private static void watch() { } @Test - void assertIsExisted() { - String testKey = "/testKey1"; - assertFalse(governanceRepositoryAPI.isExisted(testKey)); - governanceRepositoryAPI.persist(testKey, "testValue1"); - assertTrue(governanceRepositoryAPI.isExisted(testKey)); + void assertIsJobConfigurationExisted() { + assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); + governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo"); + assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); } @Test