Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor usage of PipelineJobAPI.getType() #29006

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;

import java.util.regex.Pattern;

Expand All @@ -41,14 +40,14 @@ public final class PipelineMetaDataNode {
* @param jobType job type
* @return data sources path
*/
public static String getMetaDataDataSourcesPath(final JobType jobType) {
public static String getMetaDataDataSourcesPath(final String jobType) {
return String.join("/", getMetaDataRootPath(jobType), "data_sources");
}

private static String getMetaDataRootPath(final JobType jobType) {
private static String getMetaDataRootPath(final String jobType) {
return null == jobType
? String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, "metadata")
: String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata");
: String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, jobType.toLowerCase(), "metadata");
}

/**
Expand All @@ -57,7 +56,7 @@ private static String getMetaDataRootPath(final JobType jobType) {
* @param jobType job type
* @return data sources path
*/
public static String getMetaDataProcessConfigPath(final JobType jobType) {
public static String getMetaDataProcessConfigPath(final String jobType) {
return String.join("/", getMetaDataRootPath(jobType), "process_config");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;

import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

Expand Down Expand Up @@ -192,31 +191,31 @@ public interface GovernanceRepositoryAPI {
* @param jobType job type
* @return data source properties
*/
String getMetaDataDataSources(JobType jobType);
String getMetaDataDataSources(String jobType);

/**
* Persist meta data data sources.
*
* @param jobType job type
* @param metaDataDataSources data source properties
*/
void persistMetaDataDataSources(JobType jobType, String metaDataDataSources);
void persistMetaDataDataSources(String jobType, String metaDataDataSources);

/**
* Get meta data process configuration.
*
* @param jobType job type, nullable
* @return process configuration YAML text
*/
String getMetaDataProcessConfiguration(JobType jobType);
String getMetaDataProcessConfiguration(String jobType);

/**
* Persist meta data process configuration.
*
* @param jobType job type, nullable
* @param processConfigYamlText process configuration YAML text
*/
void persistMetaDataProcessConfiguration(JobType jobType, String processConfigYamlText);
void persistMetaDataProcessConfiguration(String jobType, String processConfigYamlText);

/**
* Get job item error msg.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
Expand Down Expand Up @@ -167,22 +166,22 @@ public List<Integer> getShardingItems(final String jobId) {
}

@Override
public String getMetaDataDataSources(final JobType jobType) {
public String getMetaDataDataSources(final String jobType) {
return repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
}

@Override
public void persistMetaDataDataSources(final JobType jobType, final String metaDataDataSources) {
public void persistMetaDataDataSources(final String jobType, final String metaDataDataSources) {
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), metaDataDataSources);
}

@Override
public String getMetaDataProcessConfiguration(final JobType jobType) {
public String getMetaDataProcessConfiguration(final String jobType) {
return repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
}

@Override
public void persistMetaDataProcessConfiguration(final JobType jobType, final String processConfigYamlText) {
public void persistMetaDataProcessConfiguration(final String jobType, final String processConfigYamlText) {
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), processConfigYamlText);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
processConfigPersistService.persist(contextKey, getJobType(), processConfig);
processConfigPersistService.persist(contextKey, getType(), processConfig);
}

@Override
public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getJobType()));
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getType()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getCode().equals(getJobType().getCode()));
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
}

// TODO Add getJobInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Strings;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
Expand All @@ -39,7 +38,7 @@ public final class PipelineDataSourcePersistService implements PipelineMetaDataP

@Override
@SuppressWarnings("unchecked")
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey contextKey, final JobType jobType) {
public Map<String, DataSourcePoolProperties> load(final PipelineContextKey contextKey, final String jobType) {
String dataSourcesProps = PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
Expand All @@ -51,7 +50,7 @@ public Map<String, DataSourcePoolProperties> load(final PipelineContextKey conte
}

@Override
public void persist(final PipelineContextKey contextKey, final JobType jobType, final Map<String, DataSourcePoolProperties> propsMap) {
public void persist(final PipelineContextKey contextKey, final String jobType, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, Map<String, Object>> dataSourceMap = new LinkedHashMap<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(), swapper.swapToMap(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.metadata;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;

/**
* Pipeline meta data persist service.
Expand All @@ -34,7 +33,7 @@ public interface PipelineMetaDataPersistService<T> {
* @param jobType job type, nullable
* @return configurations
*/
T load(PipelineContextKey contextKey, JobType jobType);
T load(PipelineContextKey contextKey, String jobType);

/**
* Persist meta data.
Expand All @@ -43,5 +42,5 @@ public interface PipelineMetaDataPersistService<T> {
* @param jobType job type, nullable
* @param configs configurations
*/
void persist(PipelineContextKey contextKey, JobType jobType, T configs);
void persist(PipelineContextKey contextKey, String jobType, T configs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

Expand All @@ -34,7 +33,7 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin
private final YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();

@Override
public PipelineProcessConfiguration load(final PipelineContextKey contextKey, final JobType jobType) {
public PipelineProcessConfiguration load(final PipelineContextKey contextKey, final String jobType) {
String yamlText = PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
Expand All @@ -44,7 +43,7 @@ public PipelineProcessConfiguration load(final PipelineContextKey contextKey, fi
}

@Override
public void persist(final PipelineContextKey contextKey, final JobType jobType, final PipelineProcessConfiguration processConfig) {
public void persist(final PipelineContextKey contextKey, final String jobType, final PipelineProcessConfiguration processConfig) {
String yamlText = YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType, yamlText);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.common.metadata.node;

import org.apache.shardingsphere.data.pipeline.common.job.type.FixtureJobType;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

Expand All @@ -38,12 +37,12 @@ class PipelineMetaDataNodeTest {

@Test
void assertGetMetaDataDataSourcesPath() {
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(new FixtureJobType()), is(migrationMetaDataRootPath + "/data_sources"));
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath("FIXTURE"), is(migrationMetaDataRootPath + "/data_sources"));
}

@Test
void assertGetMetaDataProcessConfigPath() {
assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(new FixtureJobType()), is(migrationMetaDataRootPath + "/process_config"));
assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath("FIXTURE"), is(migrationMetaDataRootPath + "/process_config"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public String createJobAndStart(final PipelineContextKey contextKey, final Migra
private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setTargetDatabaseName(param.getTargetDatabaseName());
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, new MigrationJobType());
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
Expand Down Expand Up @@ -405,7 +405,7 @@ public void commit(final String jobId) {
* @param propsMap data source pool properties map
*/
public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getJobType());
Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
Expand All @@ -415,7 +415,7 @@ public void addMigrationSourceResources(final PipelineContextKey contextKey, fin
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
result.putAll(propsMap);
dataSourcePersistService.persist(contextKey, getJobType(), result);
dataSourcePersistService.persist(contextKey, getType(), result);
}

/**
Expand All @@ -425,13 +425,13 @@ public void addMigrationSourceResources(final PipelineContextKey contextKey, fin
* @param resourceNames resource names
*/
public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getJobType());
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
List<String> noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources));
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
dataSourcePersistService.persist(contextKey, getJobType(), metaDataDataSource);
dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
}

/**
Expand All @@ -441,7 +441,7 @@ public void dropMigrationSourceResources(final PipelineContextKey contextKey, fi
* @return migration source resources
*/
public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getJobType());
Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
String dataSourceName = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
Expand Down Expand Up @@ -56,9 +54,8 @@ void assertLoadAndPersist() {
String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
PipelineProcessConfiguration processConfig = new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService();
JobType jobType = new MigrationJobType();
persistService.persist(PipelineContextUtils.getContextKey(), jobType, processConfig);
String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(), jobType)));
persistService.persist(PipelineContextUtils.getContextKey(), "MIGRATION", processConfig);
String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION")));
assertThat(actualYamlText, is(expectedYamlText));
}
}
Loading