Skip to content

Commit

Permalink
Refactor PipelineJobInfo (#29105)
Browse files Browse the repository at this point in the history
* Refactor PipelineJobInfo

* Refactor PipelineJobInfo
  • Loading branch information
terrymanu authored Nov 21, 2023
1 parent 9927811 commit fe845fd
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.shardingsphere.data.pipeline.common.pojo;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Pipeline job meta data.
* Pipeline job info.
*/
public interface PipelineJobInfo {
@RequiredArgsConstructor
@Getter
public final class PipelineJobInfo {

private final PipelineJobMetaData jobMetaData;

private final String databaseName;

/**
* Get job meta data.
*
* @return job meta data
*/
PipelineJobMetaData getJobMetaData();
// TODO Rename
private final String table;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
Expand Down Expand Up @@ -80,7 +80,7 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId);
PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -41,7 +40,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getDatabaseName(), each.getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -40,7 +39,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
Expand Down Expand Up @@ -290,10 +290,10 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
}

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
Expand Down Expand Up @@ -209,12 +209,12 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
}

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
List<String> sourceTables = new LinkedList<>();
new PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables));
return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables));
}

@Override
Expand Down

0 comments on commit fe845fd

Please sign in to comment.