Skip to content

Commit

Permalink
Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 16, 2023
1 parent 68a5d5f commit 4587e96
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;

import java.sql.SQLException;
import java.util.Collection;
Expand All @@ -41,7 +43,30 @@
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {

@Override
/**
* Get pipeline job info.
*
* @param jobId job ID
* @return pipeline job info
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);

/**
* Build pipeline process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
Expand All @@ -38,24 +34,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);

/**
* Build pipeline process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
PipelineProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
* Get job configuration.
*
Expand Down Expand Up @@ -99,14 +77,6 @@ default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Get pipeline job info.
*
* @param jobId job ID
* @return pipeline job info
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Persist job item progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -178,7 +179,10 @@ public void drop(final String jobId) {
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
Expand All @@ -42,7 +38,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
Expand Down Expand Up @@ -354,21 +349,6 @@ public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurati
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

@Override
public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
throw new UnsupportedOperationException();
}

@Override
public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
throw new UnsupportedOperationException();
}

@Override
public PipelineJobInfo getJobInfo(final String jobId) {
throw new UnsupportedOperationException();
}

@Override
public Class<ConsistencyCheckJob> getPipelineJobClass() {
return ConsistencyCheckJob.class;
Expand Down

0 comments on commit 4587e96

Please sign in to comment.