Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic type of PipelineJobManager.getJobConfiguration() #29082

Merged
merged 2 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;

import java.sql.SQLException;
import java.util.List;

/**
* Inventory incremental job API.
Expand Down Expand Up @@ -77,14 +75,6 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig);

/**
* Get job infos.
*
* @param jobId job ID
* @return job item infos
*/
List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);

/**
* Build pipeline data consistency checker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
Expand All @@ -41,7 +44,9 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -52,7 +57,7 @@
@RequiredArgsConstructor
public final class InventoryIncrementalJobManager {

private final PipelineJobAPI jobAPI;
private final InventoryIncrementalJobAPI jobAPI;

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

Expand All @@ -77,6 +82,41 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobAPI.getType()));
}

/**
* Get job infos.
*
* @param jobId job ID
* @return job item infos
*/
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobItemManager<InventoryIncrementalJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
}
int inventoryFinishedPercentage = 0;
if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus() || JobStatus.FINISHED == jobItemProgress.getStatus()) {
inventoryFinishedPercentage = 100;
} else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
}
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
}
return result;
}

/**
* Get job progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ public final class PipelineJobManager {
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
* @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
public PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -41,7 +42,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType());
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
List<InventoryIncrementalJobItemInfo> jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -39,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING"))
PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING"))
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -36,7 +37,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor implements QueryableRALE

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationCheckAlgorithmsStatement sqlStatement) {
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
return inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
each -> new LocalDataQueryResultRow(each.getType(), each.getTypeAliases(),
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")), each.getDescription()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -40,7 +41,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
List<InventoryIncrementalJobItemInfo> jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Loading