Skip to content

Commit

Permalink
Split PipelineJobAPI to PipelineJobOption and PipelineJobAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 25, 2023
1 parent 618c077 commit 12d885e
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -34,7 +34,7 @@
*/
public final class ShowMigrationListExecutor implements QueryableRALExecutor<ShowMigrationListStatement> {

private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobAPI());
private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobOption());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
Expand All @@ -35,11 +35,11 @@
*/
public final class ShowMigrationSourceStorageUnitsExecutor implements QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationSourceStorageUnitsStatement sqlStatement) {
Iterator<Collection<Object>> data = jobAPI.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator();
Iterator<Collection<Object>> data = jobOption.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>) data.next()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -32,13 +32,13 @@
@Slf4j
public final class MigrateTableUpdater implements RALUpdater<MigrateTableStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

@Override
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
jobAPI.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
jobOption.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
Expand All @@ -42,7 +42,7 @@
*/
public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdater<RegisterMigrationSourceStorageUnitStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

private final DataSourcePoolPropertiesValidateHandler validateHandler = new DataSourcePoolPropertiesValidateHandler();

Expand All @@ -55,7 +55,7 @@ public void executeUpdate(final String databaseName, final RegisterMigrationSour
DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
jobAPI.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
jobOption.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;

Expand All @@ -27,7 +27,7 @@
*/
public final class StartMigrationUpdater implements RALUpdater<StartMigrationStatement> {

private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobAPI());
private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobOption());

@Override
public void executeUpdate(final String databaseName, final StartMigrationStatement sqlStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;

Expand All @@ -27,9 +27,9 @@
*/
public final class StopMigrationUpdater implements RALUpdater<StopMigrationStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
private final PipelineJobManager jobManager = new PipelineJobManager(jobOption);

@Override
public void executeUpdate(final String databaseName, final StopMigrationStatement sqlStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
Expand All @@ -28,11 +28,11 @@
*/
public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

@Override
public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
jobAPI.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
jobOption.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
Expand All @@ -31,8 +27,13 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
Expand All @@ -42,9 +43,8 @@
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;

Expand Down Expand Up @@ -99,7 +99,7 @@ public Map<String, TableDataConsistencyCheckResult> check(final String algorithm
}

private long getRecordsCount() {
Map<Integer, TransmissionJobItemProgress> jobProgress = new TransmissionJobManager(new MigrationJobAPI()).getJobProgress(jobConfig);
Map<Integer, TransmissionJobItemProgress> jobProgress = new TransmissionJobManager(new MigrationJobOption()).getJobProgress(jobConfig);
return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
Expand All @@ -81,9 +81,9 @@
@Slf4j
public final class MigrationJobPreparer {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final MigrationJobOption jobOption = new MigrationJobOption();

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

/**
* Do prepare work.
Expand Down

0 comments on commit 12d885e

Please sign in to comment.