Skip to content

Commit

Permalink
Refactor TransmissionJobManager (#29264)
Browse files Browse the repository at this point in the history
* Refactor TransmissionJobManager

* Refactor TransmissionJobManager
  • Loading branch information
terrymanu authored Dec 2, 2023
1 parent 8c11cdc commit 582988f
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -48,46 +45,13 @@ public final class TransmissionJobManager {

private final TransmissionJobOption jobOption;

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

/**
* Alter process configuration.
*
* @param contextKey context key
* @param processConfig process configuration
*/
public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
processConfigPersistService.persist(contextKey, jobOption.getType(), processConfig);
}

/**
* Show pipeline process configuration.
*
* @param jobId job id
* @return pipeline process configuration
*/
public PipelineProcessConfiguration showProcessConfiguration(final String jobId) {
return showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId));
}

/**
* Show pipeline process configuration.
*
* @param contextKey context key
* @return pipeline process configuration
*/
public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobOption.getType()));
}

/**
* Get job infos.
*
* @param jobId job ID
* @return job item infos
*/
public List<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
public Collection<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress = getJobProgress(jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -42,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, new CDCJobType().getType()).getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,29 @@

import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;

import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Collections;

/**
* Show streaming rule executor.
*/
public final class ShowStreamingRuleExecutor implements QueryableRALExecutor<ShowStreamingRuleStatement> {

private final PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService();

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption())
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
return result;
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), "STREAMING"));
return Collections.singleton(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
}

private String getString(final Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -41,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
Expand All @@ -58,7 +59,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -90,7 +91,7 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption);
private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final CDCJobPreparer jobPreparer = new CDCJobPreparer();

Expand Down Expand Up @@ -132,7 +133,8 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType()));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
Expand All @@ -37,7 +38,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
Expand All @@ -61,6 +62,8 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

@Getter
private final ConsistencyCheckJobItemContext jobItemContext;

Expand Down Expand Up @@ -108,7 +111,8 @@ protected void runBlocking() {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId());
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()), jobType.getType()));
PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker(
parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
Expand All @@ -37,8 +38,9 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
Expand Down Expand Up @@ -68,10 +70,10 @@ public final class MigrationJob extends AbstractSimplePipelineJob {

private final MigrationJobOption jobOption = new MigrationJobOption();

private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption);

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();

// Shared by all sharding items
Expand All @@ -86,7 +88,8 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType()));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
Expand Down
Loading

0 comments on commit 582988f

Please sign in to comment.