diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java index a9a630ec205c1..2564468586f5e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java @@ -18,9 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.option; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; @@ -47,14 +45,6 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { */ PipelineJobInfo getJobInfo(String jobId); - /** - * Extend YAML job configuration. - * - * @param contextKey context key - * @param yamlJobConfig YAML job configuration - */ - void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); - /** * Build task configuration. * diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java index 87ddb7c0ed5fe..081e86e82474e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java @@ -17,20 +17,16 @@ package org.apache.shardingsphere.data.pipeline.cdc; -import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; -import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; @@ -84,19 +80,6 @@ public PipelineJobInfo getJobInfo(final String jobId) { return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } - @Override - public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { - YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig; - if (null == yamlJobConfig.getJobId()) { - config.setJobId(new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType()).marshal()); - } - if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) { - PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(), - config.getDataSourceConfiguration().getParameter()); - config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType()); - } - } - @Override public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 80b8c0b9f9733..71c58442b9bfd 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -20,19 +20,21 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJob; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId; import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJob; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; @@ -49,10 +51,10 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap; @@ -113,7 +115,6 @@ public CDCJobAPI() { public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) { PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY); YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey); - jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig); CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId())); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); @@ -133,6 +134,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) { YamlCDCJobConfiguration result = new YamlCDCJobConfiguration(); + result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull(), sinkType.name()).marshal()); result.setDatabaseName(param.getDatabaseName()); result.setSchemaTableNames(param.getSchemaTableNames()); result.setFull(param.isFull()); @@ -148,6 +150,8 @@ private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParam JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getDataNodesMap().entrySet().stream() .map(each -> new JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1))).collect(Collectors.toList())); result.setTablesFirstDataNodes(tableFirstDataNodes.marshal()); + result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance( + result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType()); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java index 68888ab63b312..d2554aed14b57 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java @@ -23,9 +23,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; @@ -47,7 +45,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; @@ -97,14 +94,6 @@ public PipelineJobInfo getJobInfo(final String jobId) { return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); } - @Override - public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { - YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig; - if (null == yamlJobConfig.getJobId()) { - config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal()); - } - } - @Override public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index 2b0beae44af54..84e63ee2345a7 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -45,6 +45,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration; @@ -92,8 +93,6 @@ @Slf4j public final class MigrationJobAPI implements TransmissionJobAPI { - private final TransmissionJobOption jobOption; - private final PipelineJobManager jobManager; private final PipelineJobConfigurationManager jobConfigManager; @@ -101,7 +100,7 @@ public final class MigrationJobAPI implements TransmissionJobAPI { private final PipelineDataSourcePersistService dataSourcePersistService; public MigrationJobAPI() { - jobOption = new MigrationJobOption(); + TransmissionJobOption jobOption = new MigrationJobOption(); jobManager = new PipelineJobManager(jobOption); jobConfigManager = new PipelineJobConfigurationManager(jobOption); dataSourcePersistService = new PipelineDataSourcePersistService(); @@ -164,7 +163,7 @@ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineCo result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); - jobOption.extendYamlJobConfiguration(contextKey, result); + result.setJobId(new MigrationJobId(contextKey, result.getJobShardingDataNodes()).marshal()); return result; } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java index e93a0531eff68..e39c149bd2175 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java @@ -27,15 +27,12 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.test.util.ConfigurationFileUtils; import java.sql.Connection; @@ -96,7 +93,6 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( result.setSources(sources); result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration( ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}", databaseNameSuffix)))); - ((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result); return result; }