Skip to content

Commit

Permalink
Refactor GovernanceRepositoryAPI (#29117)
Browse files Browse the repository at this point in the history
* Remove GovernanceRepositoryAPI.getChildrenKeys()

* Remove GovernanceRepositoryAPI.watchPipeLineRootPath()

* Rename GovernanceRepositoryAPI.watchPipeLineRootPath()

* Rename GovernanceRepositoryAPI.updateJobItemErrorMessage()

* Refactor GovernanceRepositoryAPI
  • Loading branch information
terrymanu authored Nov 21, 2023
1 parent 2715b71 commit 2a7c5f9
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each)));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent);
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
}

private void dispatchEvent(final DataChangedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;

import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

import java.util.Collection;
Expand All @@ -31,6 +33,13 @@
*/
public interface GovernanceRepositoryAPI {

/**
* Watch pipeLine root path.
*
* @param listener data changed event listener
*/
void watchPipeLineRootPath(DataChangedEventListener listener);

/**
* Whether job configuration existed.
*
Expand Down Expand Up @@ -147,36 +156,29 @@ public interface GovernanceRepositoryAPI {
void deleteJob(String jobId);

/**
* Get node's sub-nodes list.
*
* @param key key of data
* @return sub-nodes name list
*/
List<String> getChildrenKeys(String key);

/**
* Watch key or path of governance server.
* Persist job root info.
*
* @param key key of data
* @param listener data changed event listener
* @param jobId job ID
* @param jobClass job class
*/
void watch(String key, DataChangedEventListener listener);
void persistJobRootInfo(String jobId, Class<? extends PipelineJob> jobClass);

/**
* Persist data.
*
* @param key key of data
* @param value value of data
* Persist job configuration.
*
* @param jobId job ID
* @param jobConfigPOJO job configuration POJO
*/
void persist(String key, String value);
void persistJobConfiguration(String jobId, JobConfigurationPOJO jobConfigPOJO);

/**
* Update data.
* Update job item error message.
*
* @param key key of data
* @param value value of data
* @param jobId job ID
* @param shardingItem sharding item
* @param errorMessage error message
*/
void update(String key, String value);
void updateJobItemErrorMessage(String jobId, int shardingItem, String errorMessage);

/**
* Get sharding items of job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand All @@ -50,6 +53,11 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP

private final ClusterPersistRepository repository;

@Override
public void watchPipeLineRootPath(final DataChangedEventListener listener) {
repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
}

@Override
public boolean isJobConfigurationExisted(final String jobId) {
return null != repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
Expand Down Expand Up @@ -142,28 +150,23 @@ public void deleteJob(final String jobId) {
}

@Override
public List<String> getChildrenKeys(final String key) {
return repository.getChildrenKeys(key);
}

@Override
public void watch(final String key, final DataChangedEventListener listener) {
repository.watch(key, listener);
public void persistJobRootInfo(final String jobId, final Class<? extends PipelineJob> jobClass) {
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobClass.getName());
}

@Override
public void persist(final String key, final String value) {
repository.persist(key, value);
public void persistJobConfiguration(final String jobId, final JobConfigurationPOJO jobConfigPOJO) {
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfigPOJO));
}

@Override
public void update(final String key, final String value) {
repository.update(key, value);
public void updateJobItemErrorMessage(final String jobId, final int shardingItem, final String errorMessage) {
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), errorMessage);
}

@Override
public List<Integer> getShardingItems(final String jobId) {
List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
List<String> result = repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
return result.stream().map(Integer::parseInt).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

Expand Down Expand Up @@ -56,7 +55,7 @@ public String getErrorMessage() {
* @param error error
*/
public void updateErrorMessage(final Object error) {
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error));
governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, null == error ? "" : buildErrorMessage(error));
}

private String buildErrorMessage(final Object error) {
Expand All @@ -67,6 +66,6 @@ private String buildErrorMessage(final Object error) {
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -80,8 +79,8 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName());
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
repositoryAPI.persistJobConfiguration(jobId, jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
Expand Down Expand Up @@ -125,10 +124,10 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName());
repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), getJobClass());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO));
repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -71,7 +74,7 @@ static void beforeClass() {
}

private static void watch() {
governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, event -> {
governanceRepositoryAPI.watchPipeLineRootPath(event -> {
if ((PipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
Expand All @@ -82,7 +85,7 @@ private static void watch() {
@Test
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo");
getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo");
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}

Expand Down Expand Up @@ -114,24 +117,16 @@ void assertPersistJobCheckResult() {

@Test
void assertDeleteJob() {
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
}

@Test
void assertGetChildrenKeys() {
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT);
assertFalse(actual.isEmpty());
assertTrue(actual.contains("1"));
}

@Test
void assertWatch() throws InterruptedException {
String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
governanceRepositoryAPI.persist(key, "");
getClusterPersistRepository().persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
Expand Down Expand Up @@ -167,6 +162,11 @@ void assertLatestCheckJobIdPersistenceDeletion() {
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(), "Expected no checkJobId to be present after deletion");
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
MigrationTaskConfiguration taskConfig = result.getTaskConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;

import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -59,7 +62,7 @@ void assertCountAndDataCheck() throws SQLException {
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");
Map<String, TableDataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null);
Expand All @@ -68,6 +71,11 @@ void assertCountAndDataCheck() throws SQLException {
assertTrue(actual.get(checkKey).isMatched());
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) {
return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}
Expand Down

0 comments on commit 2a7c5f9

Please sign in to comment.