diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java index 54e492672c22e..0d2827566e57a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java @@ -20,6 +20,8 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; +import java.util.Optional; + /** * Pipeline job id. */ @@ -40,4 +42,13 @@ public interface PipelineJobId { * @return pipeline context key */ PipelineContextKey getContextKey(); + + /** + * Get sequence. + * + * @return sequence + */ + default Optional getSequence() { + return Optional.empty(); + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java index 2ebadee92d7d0..829e2a458cf99 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java @@ -52,7 +52,7 @@ public final class PipelineJobIdUtils { * @return job id */ public static String marshal(final PipelineJobId jobId) { - return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) + marshalSuffix(jobId); + return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) + marshalSuffix(jobId) + jobId.getSequence().map(String::valueOf).orElse(""); } private static String marshalPrefix(final PipelineJobType jobType, final PipelineContextKey contextKey) { diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java index 06dfe9ee8ab87..526b23f97fa57 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java @@ -23,6 +23,8 @@ import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence; +import java.util.Optional; + /** * Consistency check job id. */ @@ -60,4 +62,9 @@ public ConsistencyCheckJobId(final PipelineContextKey contextKey, final String p public static int parseSequence(final String checkJobId) { return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1)); } + + @Override + public Optional getSequence() { + return Optional.of(sequence); + } }