Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() #29010

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface PipelineJobId {
* @return context key
*/
PipelineContextKey getContextKey();

/**
* Marshal job ID.
*
* @return job ID
*/
String marshal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
Expand All @@ -40,14 +39,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Marshal pipeline job id.
*
* @param pipelineJobId pipeline job id
* @return marshaled text
*/
String marshalJobId(PipelineJobId pipelineJobId);

/**
* Extend YAML job configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
Expand Down Expand Up @@ -57,13 +56,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {

protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + marshalJobIdLeftPart(pipelineJobId);
}

protected abstract String marshalJobIdLeftPart(PipelineJobId pipelineJobId);

@Override
public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.shardingsphere.data.pipeline.cdc.api.impl;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
Expand Down Expand Up @@ -48,7 +46,6 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
Expand Down Expand Up @@ -84,7 +81,6 @@
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;

import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -242,7 +238,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis
public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
config.setJobId(generateJobId(contextKey, config));
config.setJobId(new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType()).marshal());
}
if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
Expand All @@ -251,18 +247,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}
}

private String generateJobId(final PipelineContextKey contextKey, final YamlCDCJobConfiguration config) {
CDCJobId jobId = new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType());
return marshalJobId(jobId);
}

@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
CDCJobId jobId = (CDCJobId) pipelineJobId;
String text = Joiner.on('|').join(jobId.getContextKey().getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull());
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}

@Override
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.shardingsphere.data.pipeline.cdc.core.job;

import com.google.common.base.Joiner;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
Expand All @@ -44,4 +48,9 @@ public CDCJobId(final PipelineContextKey contextKey, final List<String> schemaTa
this.full = full;
this.sinkType = sinkType;
}

@Override
public String marshal() {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + DigestUtils.md5Hex(Joiner.on('|').join(getContextKey().getDatabaseName(), schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;

/**
Expand Down Expand Up @@ -57,4 +58,9 @@ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String p
public static int parseSequence(final String checkJobId) {
return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1));
}

@Override
public String marshal() {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + parentJobId + sequence;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
Expand Down Expand Up @@ -84,12 +83,6 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {

private final YamlConsistencyCheckJobItemProgressSwapper swapper = new YamlConsistencyCheckJobItemProgressSwapper();

@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
return jobId.getParentJobId() + jobId.getSequence();
}

/**
* Create consistency check configuration and start job.
*
Expand All @@ -110,7 +103,7 @@ public String createJobAndStart(final CreateConsistencyCheckJobParameter param)
}
verifyPipelineDatabaseType(param);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
String result = marshalJobId(latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)));
String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
repositoryAPI.persistLatestCheckJobId(parentJobId, result);
repositoryAPI.deleteCheckJobResult(parentJobId, result);
dropJob(result);
Expand Down Expand Up @@ -235,7 +228,7 @@ public void dropByParentJobId(final String parentJobId) {
Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
String checkJobId = marshalJobId(new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()));
String checkJobId = new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()).marshal();
repositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
} else {
repositoryAPI.deleteLatestCheckJobId(parentJobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import lombok.Getter;
import lombok.ToString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.util.json.JsonUtils;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
Expand All @@ -37,4 +41,9 @@ public MigrationJobId(final PipelineContextKey contextKey, final List<String> jo
super(new MigrationJobType(), contextKey);
this.jobShardingDataNodes = jobShardingDataNodes;
}

@Override
public String marshal() {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
Expand All @@ -37,7 +36,6 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
Expand Down Expand Up @@ -91,7 +89,6 @@
import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
Expand Down Expand Up @@ -211,12 +208,6 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
return result;
}

@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
String text = JsonUtils.toJsonString(pipelineJobId);
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}

@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
Expand All @@ -231,15 +222,10 @@ protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
config.setJobId(generateJobId(contextKey, config));
config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal());
}
}

private String generateJobId(final PipelineContextKey contextKey, final YamlMigrationJobConfiguration config) {
MigrationJobId jobId = new MigrationJobId(contextKey, config.getJobShardingDataNodes());
return marshalJobId(jobId);
}

@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;

Expand All @@ -41,8 +40,7 @@ void assertParse() {

private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db", instanceType);
MigrationJobId pipelineJobId = new MigrationJobId(contextKey, Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"));
String jobId = new MigrationJobAPI().marshalJobId(pipelineJobId);
String jobId = new MigrationJobId(contextKey, Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1")).marshal();
assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class));
PipelineContextKey actualContextKey = PipelineJobIdUtils.parseContextKey(jobId);
assertThat(actualContextKey.getInstanceType(), is(instanceType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.ConfigurationFileUtils;

Expand Down Expand Up @@ -79,7 +78,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
PipelineContextKey contextKey = new PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
result.setJobId(generateMigrationJobId(contextKey, result));
result.setJobId(new MigrationJobId(contextKey, result.getJobShardingDataNodes()).marshal());
Map<String, YamlPipelineDataSourceConfiguration> sources = new LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(
Expand All @@ -100,11 +99,6 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
return result;
}

private static String generateMigrationJobId(final PipelineContextKey contextKey, final YamlMigrationJobConfiguration yamlJobConfig) {
MigrationJobId migrationJobId = new MigrationJobId(contextKey, yamlJobConfig.getJobShardingDataNodes());
return new MigrationJobAPI().marshalJobId(migrationJobId);
}

private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) {
YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
result.setType(config.getType());
Expand Down
Loading