Skip to content

Commit

Permalink
Refactor PipelineJobManager (#29077)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 18, 2023
1 parent c4f1ef1 commit b76937e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
Expand All @@ -47,7 +46,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Pipeline job manager.
Expand Down Expand Up @@ -191,18 +189,15 @@ public void drop(final String jobId) {
* @param contextKey context key
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey contextKey) {
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
if (jobAPI instanceof InventoryIncrementalJobAPI) {
return getJobBriefInfos(contextKey, jobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
.filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
.map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"))
.filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}

/**
* Get job item progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
Expand Down

0 comments on commit b76937e

Please sign in to comment.