From 0b736453ea5649343de9457111d21caf7f7db311 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Mon, 4 Dec 2023 23:45:52 +0800 Subject: [PATCH] Refactor PipelineJobIdUtils (#29283) * Revise javadoc of PipelineJobId * Refactor PipelineJobIdUtils --- .../data/pipeline/common/job/PipelineJobId.java | 16 +++++++++++++--- .../pipeline/core/job/PipelineJobIdUtils.java | 17 +++++++++++++---- .../data/pipeline/cdc/CDCJobId.java | 11 ++--------- .../data/pipeline/cdc/api/CDCJobAPI.java | 2 +- .../data/pipeline/cdc/CDCJobIdTest.java | 4 +++- .../consistencycheck/ConsistencyCheckJobId.java | 8 ++------ .../api/ConsistencyCheckJobAPI.java | 5 +++-- .../scenario/migration/MigrationJobId.java | 12 ++---------- .../scenario/migration/api/MigrationJobAPI.java | 2 +- .../core/job/PipelineJobIdUtilsTest.java | 2 +- .../core/util/JobConfigurationBuilder.java | 3 ++- .../ConsistencyCheckJobTest.java | 3 ++- .../api/impl/ConsistencyCheckJobAPITest.java | 2 +- 13 files changed, 46 insertions(+), 41 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java index 88d46ab720d70..54e492672c22e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java @@ -17,6 +17,9 @@ package org.apache.shardingsphere.data.pipeline.common.job; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; + /** * Pipeline job id. */ @@ -25,9 +28,16 @@ public interface PipelineJobId { String CURRENT_VERSION = "02"; /** - * Marshal job ID. + * Get pipeline job type. * - * @return job ID + * @return pipeline job type + */ + PipelineJobType getJobType(); + + /** + * Get pipeline context key. + * + * @return pipeline context key */ - String marshal(); + PipelineContextKey getContextKey(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java index add4d57766d01..2ebadee92d7d0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java @@ -24,6 +24,7 @@ import lombok.SneakyThrows; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; @@ -34,6 +35,7 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; +import org.apache.shardingsphere.infra.util.json.JsonUtils; import java.nio.charset.StandardCharsets; @@ -46,11 +48,14 @@ public final class PipelineJobIdUtils { /** * Marshal job id prefix. * - * @param jobType pipeline job type - * @param contextKey pipeline context key - * @return job id common prefix + * @param jobId pipeline job id + * @return job id */ - public static String marshalPrefix(final PipelineJobType jobType, final PipelineContextKey contextKey) { + public static String marshal(final PipelineJobId jobId) { + return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) + marshalSuffix(jobId); + } + + private static String marshalPrefix(final PipelineJobType jobType, final PipelineContextKey contextKey) { InstanceType instanceType = contextKey.getInstanceType(); String databaseName = instanceType == InstanceType.PROXY ? "" : contextKey.getDatabaseName(); String databaseNameHex = Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true); @@ -59,6 +64,10 @@ public static String marshalPrefix(final PipelineJobType jobType, final Pipeline return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION + encodedInstanceType + databaseNameLengthHex + databaseNameHex; } + private static String marshalSuffix(final PipelineJobId jobId) { + return DigestUtils.md5Hex(JsonUtils.toJsonString(jobId).getBytes(StandardCharsets.UTF_8)); + } + /** * Parse job type. * diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java index 31e406c53cacb..86ecadbba18bd 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java @@ -17,21 +17,19 @@ package org.apache.shardingsphere.data.pipeline.cdc; -import com.google.common.base.Joiner; +import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import java.nio.charset.StandardCharsets; import java.util.List; /** * CDC job id. */ @RequiredArgsConstructor +@Getter public final class CDCJobId implements PipelineJobId { private final PipelineJobType jobType = new CDCJobType(); @@ -41,9 +39,4 @@ public final class CDCJobId implements PipelineJobId { private final List schemaTableNames; private final boolean full; - - @Override - public String marshal() { - return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + DigestUtils.md5Hex(Joiner.on('|').join(contextKey.getDatabaseName(), schemaTableNames, full).getBytes(StandardCharsets.UTF_8)); - } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index d3667fd81114d..ea30013cd482f 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -134,7 +134,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) { YamlCDCJobConfiguration result = new YamlCDCJobConfiguration(); - result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull()).marshal()); + result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull()))); result.setDatabaseName(param.getDatabaseName()); result.setSchemaTableNames(param.getSchemaTableNames()); result.setFull(param.isFull()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java index b7657475f2fc6..f0d0c0b0beeb7 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java @@ -22,6 +22,8 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.junit.jupiter.api.Test; +import java.util.Collections; + import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -29,7 +31,7 @@ class CDCJobIdTest { @Test void assertParseJobType() { - String jobId = PipelineJobIdUtils.marshalPrefix(new CDCJobType(), new PipelineContextKey("sharding_db", InstanceType.PROXY)) + "abcd"; + String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new PipelineContextKey("sharding_db", InstanceType.PROXY), Collections.singletonList("foo"), true)); assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(CDCJobType.class)); } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java index 295145dc2f8dd..06dfe9ee8ab87 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java @@ -17,15 +17,16 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck; +import lombok.Getter; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence; /** * Consistency check job id. */ +@Getter public final class ConsistencyCheckJobId implements PipelineJobId { private final PipelineJobType jobType = new ConsistencyCheckJobType(); @@ -59,9 +60,4 @@ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String p public static int parseSequence(final String checkJobId) { return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1)); } - - @Override - public String marshal() { - return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + parentJobId + sequence; - } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java index e4b7f31ebea5d..be1d5ee6f60c8 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java @@ -96,7 +96,8 @@ public String start(final CreateConsistencyCheckJobParameter param) { } checkPipelineDatabaseTypes(param); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); - String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); + String result = PipelineJobIdUtils.marshal( + latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId))); governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result); governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result); jobManager.drop(result); @@ -155,7 +156,7 @@ public void drop(final String parentJobId) { Optional previousSequence = ConsistencyCheckSequence.getPreviousSequence( checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId)); if (previousSequence.isPresent()) { - String checkJobId = new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()).marshal(); + String checkJobId = PipelineJobIdUtils.marshal(new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get())); governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, checkJobId); } else { governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java index b21e1de471804..b0a13d30437e4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java @@ -17,28 +17,20 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; +import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.infra.util.json.JsonUtils; - -import java.nio.charset.StandardCharsets; /** * Migration job id. */ @RequiredArgsConstructor +@Getter public final class MigrationJobId implements PipelineJobId { private final PipelineJobType jobType = new MigrationJobType(); private final PipelineContextKey contextKey; - - @Override - public String marshal() { - return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8)); - } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index f55ae220f422b..75d1bf91c5ece 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -162,7 +162,7 @@ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineCo result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); - result.setJobId(new MigrationJobId(contextKey).marshal()); + result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey))); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java index 6af65a7a0bcd7..f7c7bcf31bb28 100644 --- a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java +++ b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java @@ -38,7 +38,7 @@ void assertParse() { private void assertParse0(final InstanceType instanceType) { PipelineContextKey contextKey = new PipelineContextKey("sharding_db", instanceType); - String jobId = new MigrationJobId(contextKey).marshal(); + String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey)); assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class)); PipelineContextKey actualContextKey = PipelineJobIdUtils.parseContextKey(jobId); assertThat(actualContextKey.getInstanceType(), is(instanceType)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java index 1f27e0079997d..8a63105604c22 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration; @@ -76,7 +77,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( result.setTablesFirstDataNodes("t_order:ds_0.t_order"); result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order")); PipelineContextKey contextKey = new PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY); - result.setJobId(new MigrationJobId(contextKey).marshal()); + result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey))); Map sources = new LinkedHashMap<>(); String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9); PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration( diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java index 1905c5dd26339..9f86dcef8f4e9 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; @@ -49,7 +50,7 @@ static void beforeClass() { @Test void assertBuildPipelineJobItemContext() { ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId()); - String checkJobId = pipelineJobId.marshal(); + String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId); Map expectTableCheckPosition = Collections.singletonMap("t_order", 100); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition))); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index e3b53e10fcfc6..902bc94a93a24 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -72,7 +72,7 @@ void assertCreateJobConfig() { parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; - String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); + String expectCheckJobId = PipelineJobIdUtils.marshal(new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence)); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); assertNull(checkJobConfig.getAlgorithmTypeName()); int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);