Skip to content

Commit

Permalink
Refactor GovernanceRepositoryAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 21, 2023
1 parent fbc262e commit 000e573
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;

import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

import java.util.Collection;
Expand Down Expand Up @@ -154,12 +156,20 @@ public interface GovernanceRepositoryAPI {
void deleteJob(String jobId);

/**
* Persist data.
* Persist job root info.
*
* @param key key of data
* @param value value of data
* @param jobId job ID
* @param jobClass job class
*/
void persistJobRootInfo(String jobId, Class<? extends PipelineJob> jobClass);

/**
* Persist job configuration.
*
* @param jobId job ID
* @param jobConfigPOJO job configuration POJO
*/
void persist(String key, String value);
void persistJobConfiguration(String jobId, JobConfigurationPOJO jobConfigPOJO);

/**
* Update job item error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
Expand All @@ -28,6 +29,7 @@
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand Down Expand Up @@ -148,8 +150,13 @@ public void deleteJob(final String jobId) {
}

@Override
public void persist(final String key, final String value) {
repository.persist(key, value);
public void persistJobRootInfo(final String jobId, final Class<? extends PipelineJob> jobClass) {
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobClass.getName());
}

@Override
public void persistJobConfiguration(final String jobId, final JobConfigurationPOJO jobConfigPOJO) {
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfigPOJO));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

Expand Down Expand Up @@ -67,6 +66,6 @@ private String buildErrorMessage(final Object error) {
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -80,8 +79,8 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName());
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
repositoryAPI.persistJobConfiguration(jobId, jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
Expand Down Expand Up @@ -125,10 +124,10 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName());
repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), getJobClass());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO));
repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -82,7 +85,7 @@ private static void watch() {
@Test
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo");
getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo");
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}

Expand Down Expand Up @@ -114,7 +117,7 @@ void assertPersistJobCheckResult() {

@Test
void assertDeleteJob() {
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
Expand All @@ -123,7 +126,7 @@ void assertDeleteJob() {
@Test
void assertWatch() throws InterruptedException {
String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
governanceRepositoryAPI.persist(key, "");
getClusterPersistRepository().persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
Expand Down Expand Up @@ -159,6 +162,11 @@ void assertLatestCheckJobIdPersistenceDeletion() {
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(), "Expected no checkJobId to be present after deletion");
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
MigrationTaskConfiguration taskConfig = result.getTaskConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;

import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -59,7 +62,7 @@ void assertCountAndDataCheck() throws SQLException {
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");
Map<String, TableDataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null);
Expand All @@ -68,6 +71,11 @@ void assertCountAndDataCheck() throws SQLException {
assertTrue(actual.get(checkKey).isMatched());
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) {
return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}
Expand Down

0 comments on commit 000e573

Please sign in to comment.