From 7b15f9839242c3f11f0c7db1733f2382519f4bcf Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 18 Nov 2023 14:45:51 +0800 Subject: [PATCH] Add YamlPipelineJobItemProgressSwapper --- .../data/pipeline/core/job/service/PipelineJobAPI.java | 2 +- .../data/pipeline/core/job/service/PipelineJobManager.java | 2 +- .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 ++-- .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 2 +- .../pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index ee2faaa7d752c..b5ab26690a031 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -98,7 +98,7 @@ default Optional getToBeStoppedPreviousJobType() { * * @return pipeline job class */ - Class getPipelineJobClass(); + Class getJobClass(); @Override String getType(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index dca8e820cd49d..99d846dd6d709 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -83,7 +83,7 @@ public Optional start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); return Optional.of(jobId); } - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getPipelineJobClass().getName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName()); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); return Optional.of(jobId); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 78ed4d4cf9c06..070328d868ee7 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -123,7 +123,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT if (repositoryAPI.isExisted(jobConfigKey)) { log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); } else { - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName()); JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO)); @@ -329,7 +329,7 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final } @Override - public Class getPipelineJobClass() { + public Class getJobClass() { return CDCJob.class; } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 0a1f7c31e3b91..62a1520c06bf6 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -312,7 +312,7 @@ public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper( } @Override - public Class getPipelineJobClass() { + public Class getJobClass() { return ConsistencyCheckJob.class; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 1f2378d05d3bb..3d12a4363bc0f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -445,7 +445,7 @@ public void refreshTableMetadata(final String jobId, final String databaseName) } @Override - public Class getPipelineJobClass() { + public Class getJobClass() { return MigrationJob.class; }