Skip to content

Commit

Permalink
Remove AbstractPipelineJobAPIImpl.buildPipelineJobMetaData() (#29016)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 12, 2023
1 parent d696698 commit 0f0ccfc
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;

/**
* Pipeline job meta data.
Expand All @@ -38,4 +39,9 @@ public final class PipelineJobMetaData {
private final String stopTime;

private final String jobParameter;

public PipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
this(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
Expand All @@ -51,11 +50,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {

protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
}

@Override
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final Pipeline
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
Expand Down

0 comments on commit 0f0ccfc

Please sign in to comment.