From c86f1c4311165acb978a77c5c61cdee57939a9c9 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 11 Nov 2023 18:27:58 +0800 Subject: [PATCH] Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() --- .../pipeline/common/job/PipelineJobId.java | 7 +++++++ .../core/job/service/PipelineJobAPI.java | 9 --------- .../impl/AbstractPipelineJobAPIImpl.java | 8 -------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 18 +----------------- .../data/pipeline/cdc/core/job/CDCJobId.java | 9 +++++++++ .../ConsistencyCheckJobId.java | 6 ++++++ .../api/impl/ConsistencyCheckJobAPI.java | 11 ++--------- .../scenario/migration/MigrationJobId.java | 9 +++++++++ .../migration/api/impl/MigrationJobAPI.java | 16 +--------------- .../core/job/PipelineJobIdUtilsTest.java | 4 +--- .../core/util/JobConfigurationBuilder.java | 12 +++--------- .../ConsistencyCheckJobTest.java | 3 +-- .../api/impl/ConsistencyCheckJobAPITest.java | 2 +- 13 files changed, 41 insertions(+), 73 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java index 5d67028c81be6..88ed08a492225 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java @@ -45,4 +45,11 @@ public interface PipelineJobId { * @return context key */ PipelineContextKey getContextKey(); + + /** + * Marshal job ID. + * + * @return job ID + */ + String marshal(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 5a2bfefcbc5cc..ee0b59f54e2ad 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; @@ -40,14 +39,6 @@ @SingletonSPI public interface PipelineJobAPI extends TypedSPI { - /** - * Marshal pipeline job id. - * - * @param pipelineJobId pipeline job id - * @return marshaled text - */ - String marshalJobId(PipelineJobId pipelineJobId); - /** * Extend YAML job configuration. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index 9a0cf65b82c5d..35e9b000d9691 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; @@ -57,13 +56,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - @Override - public final String marshalJobId(final PipelineJobId pipelineJobId) { - return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + marshalJobIdLeftPart(pipelineJobId); - } - - protected abstract String marshalJobIdLeftPart(PipelineJobId pipelineJobId); - @Override public List list(final PipelineContextKey contextKey) { return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index e203e1a2533c2..81942ca429efe 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -17,10 +17,8 @@ package org.apache.shardingsphere.data.pipeline.cdc.api.impl; -import com.google.common.base.Joiner; import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; @@ -48,7 +46,6 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; @@ -84,7 +81,6 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; -import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.Collection; import java.util.HashMap; @@ -242,7 +238,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig; if (null == yamlJobConfig.getJobId()) { - config.setJobId(generateJobId(contextKey, config)); + config.setJobId(new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType()).marshal()); } if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) { PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(), @@ -251,18 +247,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } } - private String generateJobId(final PipelineContextKey contextKey, final YamlCDCJobConfiguration config) { - CDCJobId jobId = new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType()); - return marshalJobId(jobId); - } - - @Override - protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { - CDCJobId jobId = (CDCJobId) pipelineJobId; - String text = Joiner.on('|').join(jobId.getContextKey().getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull()); - return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8)); - } - @Override public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java index 04a50813fa66b..c5e1b7a23a29b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java @@ -17,12 +17,16 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job; +import com.google.common.base.Joiner; import lombok.Getter; import lombok.ToString; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import java.nio.charset.StandardCharsets; import java.util.List; /** @@ -44,4 +48,9 @@ public CDCJobId(final PipelineContextKey contextKey, final List schemaTa this.full = full; this.sinkType = sinkType; } + + @Override + public String marshal() { + return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + DigestUtils.md5Hex(Joiner.on('|').join(getContextKey().getDatabaseName(), schemaTableNames, full).getBytes(StandardCharsets.UTF_8)); + } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java index 1ad9afa5ea32e..b9fbfda2c4172 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java @@ -21,6 +21,7 @@ import lombok.ToString; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence; /** @@ -57,4 +58,9 @@ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String p public static int parseSequence(final String checkJobId) { return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1)); } + + @Override + public String marshal() { + return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + parentJobId + sequence; + } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 087ee6833942b..2f8f80c5bbd2e 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -26,7 +26,6 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper; @@ -84,12 +83,6 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl { private final YamlConsistencyCheckJobItemProgressSwapper swapper = new YamlConsistencyCheckJobItemProgressSwapper(); - @Override - protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { - ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId; - return jobId.getParentJobId() + jobId.getSequence(); - } - /** * Create consistency check configuration and start job. * @@ -110,7 +103,7 @@ public String createJobAndStart(final CreateConsistencyCheckJobParameter param) } verifyPipelineDatabaseType(param); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); - String result = marshalJobId(latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId))); + String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); repositoryAPI.persistLatestCheckJobId(parentJobId, result); repositoryAPI.deleteCheckJobResult(parentJobId, result); dropJob(result); @@ -235,7 +228,7 @@ public void dropByParentJobId(final String parentJobId) { Optional previousSequence = ConsistencyCheckSequence.getPreviousSequence( checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId)); if (previousSequence.isPresent()) { - String checkJobId = marshalJobId(new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get())); + String checkJobId = new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()).marshal(); repositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId); } else { repositoryAPI.deleteLatestCheckJobId(parentJobId); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java index 543f41a81be5b..8b925d7b7225e 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java @@ -19,9 +19,13 @@ import lombok.Getter; import lombok.ToString; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.infra.util.json.JsonUtils; +import java.nio.charset.StandardCharsets; import java.util.List; /** @@ -37,4 +41,9 @@ public MigrationJobId(final PipelineContextKey contextKey, final List jo super(new MigrationJobType(), contextKey); this.jobShardingDataNodes = jobShardingDataNodes; } + + @Override + public String marshal() { + return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8)); + } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 2e273e15c4006..6605a1218aead 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; @@ -37,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; @@ -91,7 +89,6 @@ import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry; import org.apache.shardingsphere.mode.manager.ContextManager; -import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -211,12 +208,6 @@ private Map buildTargetTableSchemaMap(final Map sources = new LinkedHashMap<>(); String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9); PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration( @@ -100,11 +99,6 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( return result; } - private static String generateMigrationJobId(final PipelineContextKey contextKey, final YamlMigrationJobConfiguration yamlJobConfig) { - MigrationJobId migrationJobId = new MigrationJobId(contextKey, yamlJobConfig.getJobShardingDataNodes()); - return new MigrationJobAPI().marshalJobId(migrationJobId); - } - private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) { YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration(); result.setType(config.getType()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java index f918adf24dfa8..a54697a5b668b 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java @@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext; import org.apache.shardingsphere.elasticjob.api.ShardingContext; @@ -50,7 +49,7 @@ static void beforeClass() { @Test void assertBuildPipelineJobItemContext() { ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId()); - String checkJobId = new ConsistencyCheckJobAPI().marshalJobId(pipelineJobId); + String checkJobId = pipelineJobId.marshal(); Map expectTableCheckPosition = Collections.singletonMap("t_order", 100); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition))); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 2e49467926fc6..cb6f6f6a35d8f 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -66,7 +66,7 @@ void assertCreateJobConfig() { parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; - String expectCheckJobId = checkJobAPI.marshalJobId(new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence)); + String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); assertNull(checkJobConfig.getAlgorithmTypeName()); int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);