Skip to content

Commit

Permalink
Refactor PipelineJobIdUtils (#29283)
Browse files Browse the repository at this point in the history
* Revise javadoc of PipelineJobId

* Refactor PipelineJobIdUtils
  • Loading branch information
terrymanu authored Dec 4, 2023
1 parent 41c3a5d commit 0b73645
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.shardingsphere.data.pipeline.common.job;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;

/**
* Pipeline job id.
*/
Expand All @@ -25,9 +28,16 @@ public interface PipelineJobId {
String CURRENT_VERSION = "02";

/**
* Marshal job ID.
* Get pipeline job type.
*
* @return job ID
* @return pipeline job type
*/
PipelineJobType getJobType();

/**
* Get pipeline context key.
*
* @return pipeline context key
*/
String marshal();
PipelineContextKey getContextKey();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.SneakyThrows;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
Expand All @@ -34,6 +35,7 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.json.JsonUtils;

import java.nio.charset.StandardCharsets;

Expand All @@ -46,11 +48,14 @@ public final class PipelineJobIdUtils {
/**
* Marshal job id prefix.
*
* @param jobType pipeline job type
* @param contextKey pipeline context key
* @return job id common prefix
* @param jobId pipeline job id
* @return job id
*/
public static String marshalPrefix(final PipelineJobType jobType, final PipelineContextKey contextKey) {
public static String marshal(final PipelineJobId jobId) {
return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) + marshalSuffix(jobId);
}

private static String marshalPrefix(final PipelineJobType jobType, final PipelineContextKey contextKey) {
InstanceType instanceType = contextKey.getInstanceType();
String databaseName = instanceType == InstanceType.PROXY ? "" : contextKey.getDatabaseName();
String databaseNameHex = Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
Expand All @@ -59,6 +64,10 @@ public static String marshalPrefix(final PipelineJobType jobType, final Pipeline
return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION + encodedInstanceType + databaseNameLengthHex + databaseNameHex;
}

private static String marshalSuffix(final PipelineJobId jobId) {
return DigestUtils.md5Hex(JsonUtils.toJsonString(jobId).getBytes(StandardCharsets.UTF_8));
}

/**
* Parse job type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package org.apache.shardingsphere.data.pipeline.cdc;

import com.google.common.base.Joiner;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* CDC job id.
*/
@RequiredArgsConstructor
@Getter
public final class CDCJobId implements PipelineJobId {

private final PipelineJobType jobType = new CDCJobType();
Expand All @@ -41,9 +39,4 @@ public final class CDCJobId implements PipelineJobId {
private final List<String> schemaTableNames;

private final boolean full;

@Override
public String marshal() {
return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + DigestUtils.md5Hex(Joiner.on('|').join(contextKey.getDatabaseName(), schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType

private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull()).marshal());
result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull())));
result.setDatabaseName(param.getDatabaseName());
result.setSchemaTableNames(param.getSchemaTableNames());
result.setFull(param.isFull());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;

class CDCJobIdTest {

@Test
void assertParseJobType() {
String jobId = PipelineJobIdUtils.marshalPrefix(new CDCJobType(), new PipelineContextKey("sharding_db", InstanceType.PROXY)) + "abcd";
String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new PipelineContextKey("sharding_db", InstanceType.PROXY), Collections.singletonList("foo"), true));
assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(CDCJobType.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;

import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;

/**
* Consistency check job id.
*/
@Getter
public final class ConsistencyCheckJobId implements PipelineJobId {

private final PipelineJobType jobType = new ConsistencyCheckJobType();
Expand Down Expand Up @@ -59,9 +60,4 @@ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String p
public static int parseSequence(final String checkJobId) {
return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1));
}

@Override
public String marshal() {
return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + parentJobId + sequence;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public String start(final CreateConsistencyCheckJobParameter param) {
}
checkPipelineDatabaseTypes(param);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
String result = PipelineJobIdUtils.marshal(
latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)));
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result);
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result);
jobManager.drop(result);
Expand Down Expand Up @@ -155,7 +156,7 @@ public void drop(final String parentJobId) {
Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
String checkJobId = new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()).marshal();
String checkJobId = PipelineJobIdUtils.marshal(new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()));
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, checkJobId);
} else {
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,20 @@

package org.apache.shardingsphere.data.pipeline.scenario.migration;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.util.json.JsonUtils;

import java.nio.charset.StandardCharsets;

/**
* Migration job id.
*/
@RequiredArgsConstructor
@Getter
public final class MigrationJobId implements PipelineJobId {

private final PipelineJobType jobType = new MigrationJobType();

private final PipelineContextKey contextKey;

@Override
public String marshal() {
return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineCo
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
result.setJobId(new MigrationJobId(contextKey).marshal());
result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey)));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void assertParse() {

private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db", instanceType);
String jobId = new MigrationJobId(contextKey).marshal();
String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey));
assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class));
PipelineContextKey actualContextKey = PipelineJobIdUtils.parseContextKey(jobId);
assertThat(actualContextKey.getInstanceType(), is(instanceType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
Expand Down Expand Up @@ -76,7 +77,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
PipelineContextKey contextKey = new PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
result.setJobId(new MigrationJobId(contextKey).marshal());
result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey)));
Map<String, YamlPipelineDataSourceConfiguration> sources = new LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
Expand Down Expand Up @@ -49,7 +50,7 @@ static void beforeClass() {
@Test
void assertBuildPipelineJobItemContext() {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = pipelineJobId.marshal();
String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
Map<String, Object> expectTableCheckPosition = Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void assertCreateJobConfig() {
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal();
String expectCheckJobId = PipelineJobIdUtils.marshal(new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence));
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
assertNull(checkJobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
Expand Down

0 comments on commit 0b73645

Please sign in to comment.