From b76937ea73a467521d9916b9a605d0c521ac777f Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 18 Nov 2023 19:38:55 +0800 Subject: [PATCH] Refactor PipelineJobManager (#29077) --- .../core/job/service/PipelineJobManager.java | 13 ++++--------- .../handler/query/ShowStreamingListExecutor.java | 2 +- .../handler/query/ShowMigrationListExecutor.java | 2 +- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 3eab31d08af5a..4ffeaafbee988 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -35,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; -import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; @@ -47,7 +46,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Pipeline job manager. @@ -191,18 +189,15 @@ public void drop(final String jobId) { * @param contextKey context key * @return jobs info */ - public List getPipelineJobInfos(final PipelineContextKey contextKey) { + public List getJobInfos(final PipelineContextKey contextKey) { if (jobAPI instanceof InventoryIncrementalJobAPI) { - return getJobBriefInfos(contextKey, jobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); + return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream() + .filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) + .map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); } return Collections.emptyList(); } - private Stream getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) { - return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_")) - .filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())); - } - /** * Get job item progress. * diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java index f6ca1ea172a3f..c5c64bb35ffa4 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java @@ -40,7 +40,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor getRows(final ShowStreamingListStatement sqlStatement) { - return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), + return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), ((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java index a731a088a97e4..ce1877a57197d 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java @@ -39,7 +39,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor getRows(final ShowMigrationListStatement sqlStatement) { - return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), + return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), ((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());