diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java index 523b9e8997704..3d7c9b4fbcb0c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; @@ -43,6 +44,14 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI { @Override InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); + /** + * Extend YAML job configuration. + * + * @param contextKey context key + * @param yamlJobConfig YAML job configuration + */ + void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); + /** * Alter process configuration. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 134bc7e7e16d1..40fb5d8a3f0b4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -18,9 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; @@ -40,14 +38,6 @@ @SingletonSPI public interface PipelineJobAPI extends TypedSPI { - /** - * Extend YAML job configuration. - * - * @param contextKey context key - * @param yamlJobConfig YAML job configuration - */ - void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); - /** * Build task configuration. * diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index ddbb7e7a89e4d..e2662e1340dfc 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -20,7 +20,6 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; @@ -355,10 +354,6 @@ public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurati return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } - @Override - public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { - } - @Override public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { throw new UnsupportedOperationException(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java index e6d8b5e6075f9..9c3e236f37ffb 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -95,7 +96,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( result.setSources(sources); result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration( ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}", databaseNameSuffix)))); - TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION").extendYamlJobConfiguration(contextKey, result); + ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")).extendYamlJobConfiguration(contextKey, result); return result; }