Skip to content

Commit

Permalink
Rename GovernanceRepositoryAPI.isJobConfigurationExisted() (#29115)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 21, 2023
1 parent ae597f4 commit 2715b71
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public static String getJobOffsetPath(final String jobId) {
}

/**
* Get job config path.
* Get job configuration path.
*
* @param jobId job id
* @return job configuration path
*/
public static String getJobConfigPath(final String jobId) {
public static String getJobConfigurationPath(final String jobId) {
return String.join("/", getJobRootPath(jobId), "config");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
public interface GovernanceRepositoryAPI {

/**
* Whether key existing or not.
* Whether job configuration existed.
*
* @param key registry center key
* @return true if job exists, else false
* @param jobId jobId
* @return job configuration exist or not
*/
boolean isExisted(String key);
boolean isJobConfigurationExisted(String jobId);

/**
* Persist job offset info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
private final ClusterPersistRepository repository;

@Override
public boolean isExisted(final String key) {
// TODO delegate to repository isExisted
return null != repository.getDirectly(key);
public boolean isJobConfigurationExisted(final String jobId) {
return null != repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
if (repositoryAPI.isJobConfigurationExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName());
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
return Optional.of(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void assertGetJobOffsetItemPath() {

@Test
void assertGetJobConfigPath() {
assertThat(PipelineMetaDataNode.getJobConfigPath(jobId), is(jobRootPath + "/config"));
assertThat(PipelineMetaDataNode.getJobConfigurationPath(jobId), is(jobRootPath + "/config"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,13 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO));
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ private static void watch() {
}

@Test
void assertIsExisted() {
String testKey = "/testKey1";
assertFalse(governanceRepositoryAPI.isExisted(testKey));
governanceRepositoryAPI.persist(testKey, "testValue1");
assertTrue(governanceRepositoryAPI.isExisted(testKey));
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo");
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}

@Test
Expand Down

0 comments on commit 2715b71

Please sign in to comment.