Skip to content

Commit

Permalink
Add PipelineJobConfigurationManager (#29198)
Browse files Browse the repository at this point in the history
* Add PipelineJobConfigurationLoader

* Refactor PipelineJobConfigurationLoader

* Add PipelineJobConfigurationLoader

* Add PipelineJobConfigurationManager
  • Loading branch information
terrymanu authored Nov 24, 2023
1 parent 2576aaa commit 309d6f3
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,13 @@

package org.apache.shardingsphere.data.pipeline.common.config.job;

import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;

/**
* Pipeline job configuration.
*/
public interface PipelineJobConfiguration {

DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* Get job id.
*
Expand All @@ -54,29 +44,4 @@ public interface PipelineJobConfiguration {
* @return source database type
*/
DatabaseType getSourceDatabaseType();

/**
* Convert to job configuration POJO.
*
* @return converted job configuration POJO
*/
default JobConfigurationPOJO convertToJobConfigurationPOJO() {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(getJobId());
result.setShardingTotalCount(getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}

/**
* Swap to YAML pipeline job configuration.
*
* @return swapped YAML pipeline job configuration
*/
YamlPipelineJobConfiguration swapToYamlJobConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

import java.util.Optional;

Expand All @@ -36,9 +38,11 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Get YAML pipeline job configuration swapper.
*
* @param <T> type of YAML configuration
* @param <Y> type of pipeline job configuration
* @return YAML pipeline job configuration swapper
*/
YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();
<Y extends YamlConfiguration, T extends PipelineJobConfiguration> YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();

/**
* Get YAML pipeline job item progress swapper.
Expand Down Expand Up @@ -75,6 +79,15 @@ default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Whether to force no sharding when convert to job configuration POJO.
*
* @return without sharding or not
*/
default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
return false;
}

/**
* Get pipeline job class.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;

/**
* Pipeline job configuration manager.
*/
@RequiredArgsConstructor
public final class PipelineJobConfigurationManager {

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

private final PipelineJobAPI jobAPI;

/**
* Get job configuration.
*
* @param jobId job ID
* @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}

/**
* Convert to job configuration POJO.
*
* @param jobConfig pipeline job configuration
* @return converted job configuration POJO
*/
public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
int shardingTotalCount = jobAPI.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount();
result.setShardingTotalCount(shardingTotalCount);
result.setJobParameter(YamlEngine.marshal(jobAPI.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,6 @@ public final class PipelineJobManager {

private final PipelineJobAPI jobAPI;

/**
* Get job configuration.
*
* @param jobId job ID
* @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}

/**
* Start job.
*
Expand All @@ -80,7 +68,7 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
return Optional.of(jobId);
}
governanceFacade.getJobFacade().getJob().create(jobId, jobAPI.getJobClass());
governanceFacade.getJobFacade().getConfiguration().persist(jobId, jobConfig.convertToJobConfigurationPOJO());
governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobAPI).convertToJobConfigurationPOJO(jobConfig));
return Optional.of(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
* @return job item infos
*/
public List<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobConfiguration jobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(jobId);
PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(jobId);
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobAPI).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
Expand All @@ -66,11 +66,12 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
Expand All @@ -85,6 +86,7 @@

import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -100,6 +102,8 @@
@Slf4j
public final class CDCJobAPI implements TransmissionJobAPI {

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();

private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
Expand All @@ -125,7 +129,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), getJobClass());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
JobConfigurationPOJO jobConfigPOJO = new PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
if (!param.isFull()) {
Expand Down Expand Up @@ -222,7 +226,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(disabled);
if (disabled) {
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis()));
} else {
jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
Expand Down Expand Up @@ -282,6 +286,7 @@ public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobC
return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}

@SuppressWarnings("unchecked")
@Override
public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlCDCJobConfigurationSwapper();
Expand All @@ -290,10 +295,15 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
@Override
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

@Override
public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
return true;
}

@Override
public void commit(final String jobId) {
}
Expand All @@ -304,7 +314,7 @@ public void commit(final String jobId) {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.List;
Expand Down Expand Up @@ -67,18 +64,6 @@ public int getJobShardingCount() {
return jobShardingDataNodes.size();
}

@Override
public JobConfigurationPOJO convertToJobConfigurationPOJO() {
JobConfigurationPOJO result = PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
result.setShardingTotalCount(1);
return result;
}

@Override
public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
}

@RequiredArgsConstructor
@Getter
public static class SinkConfiguration {
Expand Down
Loading

0 comments on commit 309d6f3

Please sign in to comment.