Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PipelineJobManager #29207

Merged
merged 6 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.job;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;

/**
* Pipeline job id.
Expand All @@ -30,7 +30,7 @@ public interface PipelineJobId {
*
* @return type
*/
JobType getJobType();
PipelineJobType getJobType();

/**
* Get format version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
@Slf4j
public final class JobCodeRegistry {

private static final Map<String, JobType> JOB_CODE_AND_TYPE_MAP = new HashMap<>();
private static final Map<String, PipelineJobType> JOB_CODE_AND_TYPE_MAP = new HashMap<>();

static {
for (JobType each : ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
for (PipelineJobType each : ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
Preconditions.checkArgument(2 == each.getCode().length(), "Job type code length is not 2.");
JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
}
Expand All @@ -48,7 +48,7 @@ public final class JobCodeRegistry {
* @param jobTypeCode job type code
* @return job type
*/
public static JobType getJobType(final String jobTypeCode) {
public static PipelineJobType getJobType(final String jobTypeCode) {
Preconditions.checkArgument(JOB_CODE_AND_TYPE_MAP.containsKey(jobTypeCode), "Can not get job type by `%s`.", jobTypeCode);
return JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.shardingsphere.data.pipeline.common.job.type;

import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

/**
* Job type.
* Pipeline job type.
*/
@SingletonSPI
public interface JobType extends TypedSPI {
public interface PipelineJobType extends TypedSPI {

/**
* Get job type code.
Expand All @@ -33,6 +34,13 @@ public interface JobType extends TypedSPI {
*/
String getCode();

/**
* Get job option.
*
* @return job option
*/
PipelineJobOption getOption();

@Override
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand Down Expand Up @@ -89,7 +89,7 @@ protected void executeJob(final JobConfiguration jobConfig) {

protected abstract AbstractPipelineJob buildPipelineJob(String jobId);

protected abstract JobType getJobType();
protected abstract PipelineJobType getJobType();

@Override
public String getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand Down Expand Up @@ -65,7 +66,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {

protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType());
jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;

/**
* Abstract pipeline job id.
Expand All @@ -33,7 +33,7 @@ public abstract class AbstractPipelineJobId implements PipelineJobId {

public static final String CURRENT_VERSION = "02";

private final JobType jobType;
private final PipelineJobType jobType;

private final PipelineContextKey contextKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static String marshalJobIdCommonPrefix(final PipelineJobId pipelineJobId)
* @param jobId job id
* @return job type
*/
public static JobType parseJobType(final String jobId) {
public static PipelineJobType parseJobType(final String jobId) {
verifyJobId(jobId);
return JobCodeRegistry.getJobType(jobId.substring(1, 3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

import java.util.Optional;
Expand All @@ -33,7 +32,7 @@
* Pipeline job option.
*/
@SingletonSPI
public interface PipelineJobOption extends TypedSPI {
public interface PipelineJobOption {

/**
* Get YAML pipeline job configuration swapper.
Expand Down Expand Up @@ -95,6 +94,10 @@ default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
*/
Class<? extends PipelineJob> getJobClass();

@Override
/**
* Get job type.
*
* @return job type
*/
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -130,7 +130,7 @@ private static synchronized void persist(final String jobId, final int shardingI
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType())
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public <T extends PipelineJobConfiguration> T getJobConfiguration(final String j
public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
int shardingTotalCount = jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount();
result.setShardingTotalCount(shardingTotalCount);
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
Expand Down Expand Up @@ -57,19 +58,17 @@ public final class PipelineJobManager {
* Start job.
*
* @param jobConfig job configuration
* @return job id
*/
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
public void start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
return Optional.of(jobId);
return;
}
governanceFacade.getJobFacade().getJob().create(jobId, jobOption.getJobClass());
governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig));
return Optional.of(jobId);
}

/**
Expand Down Expand Up @@ -109,7 +108,7 @@ private void startCurrentDisabledJob(final String jobId) {
private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStartDisabledNextJobType)).startDisabledJob(optional);
new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType).getOption()).startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand All @@ -131,7 +130,7 @@ public void stop(final String jobId) {
private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStoppedPreviousJobType)).stop(optional);
new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType).getOption()).stop(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
Expand Down Expand Up @@ -66,7 +67,7 @@ public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext)
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
jobManager = new PipelineJobManager(jobOption);
jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
}
Expand All @@ -89,7 +90,7 @@ public void start() {
if (jobItemContext.isStopping()) {
return;
}
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

package org.apache.shardingsphere.data.pipeline.common.job.type;

import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;

import static org.mockito.Mockito.mock;

/**
* Fixture job type.
*/
public final class FixtureJobType implements JobType {
public final class FixtureJobType implements PipelineJobType {

@Override
public String getCode() {
return "00";
}

@Override
public PipelineJobOption getOption() {
return mock(PipelineJobOption.class);
}

@Override
public String getType() {
return "FIXTURE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -41,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType());
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, new CDCJobType().getType()).getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "STREAMING"))
PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption())
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -40,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION");
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
Expand Down
Loading