Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GovernanceRepositoryAPI #29117

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each)));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent);
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
}

private void dispatchEvent(final DataChangedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;

import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

import java.util.Collection;
Expand All @@ -31,6 +33,13 @@
*/
public interface GovernanceRepositoryAPI {

/**
* Watch pipeLine root path.
*
* @param listener data changed event listener
*/
void watchPipeLineRootPath(DataChangedEventListener listener);

/**
* Whether job configuration existed.
*
Expand Down Expand Up @@ -147,36 +156,29 @@ public interface GovernanceRepositoryAPI {
void deleteJob(String jobId);

/**
* Get node's sub-nodes list.
*
* @param key key of data
* @return sub-nodes name list
*/
List<String> getChildrenKeys(String key);

/**
* Watch key or path of governance server.
* Persist job root info.
*
* @param key key of data
* @param listener data changed event listener
* @param jobId job ID
* @param jobClass job class
*/
void watch(String key, DataChangedEventListener listener);
void persistJobRootInfo(String jobId, Class<? extends PipelineJob> jobClass);

/**
* Persist data.
*
* @param key key of data
* @param value value of data
* Persist job configuration.
*
* @param jobId job ID
* @param jobConfigPOJO job configuration POJO
*/
void persist(String key, String value);
void persistJobConfiguration(String jobId, JobConfigurationPOJO jobConfigPOJO);

/**
* Update data.
* Update job item error message.
*
* @param key key of data
* @param value value of data
* @param jobId job ID
* @param shardingItem sharding item
* @param errorMessage error message
*/
void update(String key, String value);
void updateJobItemErrorMessage(String jobId, int shardingItem, String errorMessage);

/**
* Get sharding items of job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand All @@ -50,6 +53,11 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP

private final ClusterPersistRepository repository;

@Override
public void watchPipeLineRootPath(final DataChangedEventListener listener) {
repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
}

@Override
public boolean isJobConfigurationExisted(final String jobId) {
return null != repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
Expand Down Expand Up @@ -142,28 +150,23 @@ public void deleteJob(final String jobId) {
}

@Override
public List<String> getChildrenKeys(final String key) {
return repository.getChildrenKeys(key);
}

@Override
public void watch(final String key, final DataChangedEventListener listener) {
repository.watch(key, listener);
public void persistJobRootInfo(final String jobId, final Class<? extends PipelineJob> jobClass) {
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobClass.getName());
}

@Override
public void persist(final String key, final String value) {
repository.persist(key, value);
public void persistJobConfiguration(final String jobId, final JobConfigurationPOJO jobConfigPOJO) {
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfigPOJO));
}

@Override
public void update(final String key, final String value) {
repository.update(key, value);
public void updateJobItemErrorMessage(final String jobId, final int shardingItem, final String errorMessage) {
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), errorMessage);
}

@Override
public List<Integer> getShardingItems(final String jobId) {
List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
List<String> result = repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
return result.stream().map(Integer::parseInt).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

Expand Down Expand Up @@ -56,7 +55,7 @@ public String getErrorMessage() {
* @param error error
*/
public void updateErrorMessage(final Object error) {
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error));
governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, null == error ? "" : buildErrorMessage(error));
}

private String buildErrorMessage(final Object error) {
Expand All @@ -67,6 +66,6 @@ private String buildErrorMessage(final Object error) {
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -80,8 +79,8 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName());
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
repositoryAPI.persistJobConfiguration(jobId, jobConfig.convertToJobConfigurationPOJO());
return Optional.of(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
Expand Down Expand Up @@ -125,10 +124,10 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName());
repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), getJobClass());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO));
repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), jobConfigPOJO);
if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -71,7 +74,7 @@ static void beforeClass() {
}

private static void watch() {
governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, event -> {
governanceRepositoryAPI.watchPipeLineRootPath(event -> {
if ((PipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
Expand All @@ -82,7 +85,7 @@ private static void watch() {
@Test
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo");
getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo");
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}

Expand Down Expand Up @@ -114,24 +117,16 @@ void assertPersistJobCheckResult() {

@Test
void assertDeleteJob() {
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
}

@Test
void assertGetChildrenKeys() {
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT);
assertFalse(actual.isEmpty());
assertTrue(actual.contains("1"));
}

@Test
void assertWatch() throws InterruptedException {
String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
governanceRepositoryAPI.persist(key, "");
getClusterPersistRepository().persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
Expand Down Expand Up @@ -167,6 +162,11 @@ void assertLatestCheckJobIdPersistenceDeletion() {
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(), "Expected no checkJobId to be present after deletion");
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
MigrationTaskConfiguration taskConfig = result.getTaskConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;

import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
Expand All @@ -32,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -59,7 +62,7 @@ void assertCountAndDataCheck() throws SQLException {
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");
Map<String, TableDataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null);
Expand All @@ -68,6 +71,11 @@ void assertCountAndDataCheck() throws SQLException {
assertTrue(actual.get(checkKey).isMatched());
}

private ClusterPersistRepository getClusterPersistRepository() {
ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository();
}

private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) {
return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}
Expand Down