Skip to content

Commit

Permalink
Refactor TransmissionJobManager
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 2, 2023
1 parent 51834fc commit d2a25d1
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -87,7 +88,7 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
* @param jobId job ID
* @return job item infos
*/
public List<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
public Collection<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress = getJobProgress(jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -42,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, new CDCJobType().getType()).getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -41,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ void assertGetJobItemInfosAtBegin() {
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress));
List<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
Collection<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
assertThat(jobItemInfos.size(), is(1));
TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0);
TransmissionJobItemInfo jobItemInfo = jobItemInfos.iterator().next();
assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING));
assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(0));
}
Expand All @@ -330,8 +330,8 @@ void assertGetJobItemInfosAtIncrementTask() {
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress));
List<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0);
Collection<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
TransmissionJobItemInfo jobItemInfo = jobItemInfos.stream().iterator().next();
assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK));
assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));
}
Expand Down

0 comments on commit d2a25d1

Please sign in to comment.