diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractInventoryIncrementalProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java similarity index 95% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractInventoryIncrementalProcessContext.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java index 245439f85b772..62b28959edcd1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractInventoryIncrementalProcessContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java @@ -32,10 +32,10 @@ import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; /** - * Abstract inventory incremental process context. + * Abstract transmission process context. */ @Getter -public abstract class AbstractInventoryIncrementalProcessContext implements InventoryIncrementalProcessContext { +public abstract class AbstractTransmissionProcessContext implements TransmissionProcessContext { private final PipelineProcessConfiguration pipelineProcessConfig; @@ -51,7 +51,7 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve private final PipelineLazyInitializer incrementalExecuteEngineLazyInitializer; - protected AbstractInventoryIncrementalProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) { + protected AbstractTransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) { PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig); this.pipelineProcessConfig = processConfig; PipelineReadConfiguration readConfig = processConfig.getRead(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionJobItemContext.java similarity index 84% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionJobItemContext.java index ae0cd2f48c2a9..1b2fd5cf96f21 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalJobItemContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionJobItemContext.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.common.context; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; @@ -26,12 +26,12 @@ import java.util.Collection; /** - * Inventory incremental job item context. + * Transmission job item context. */ -public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener { +public interface TransmissionJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener { @Override - InventoryIncrementalProcessContext getJobProcessContext(); + TransmissionProcessContext getJobProcessContext(); /** * Get inventory tasks. @@ -52,7 +52,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte * * @return init progress */ - InventoryIncrementalJobItemProgress getInitProgress(); + TransmissionJobItemProgress getInitProgress(); /** * Get source meta data loader. @@ -90,7 +90,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte long getInventoryRecordsCount(); @Override - default InventoryIncrementalJobItemProgress toProgress() { - return new InventoryIncrementalJobItemProgress(this); + default TransmissionJobItemProgress toProgress() { + return new TransmissionJobItemProgress(this); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java similarity index 94% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalProcessContext.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java index 8b57d763e9139..e09b9ca9d961f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/InventoryIncrementalProcessContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java @@ -22,9 +22,9 @@ import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; /** - * Inventory incremental process context. + * Transmission process context. */ -public interface InventoryIncrementalProcessContext extends PipelineProcessContext { +public interface TransmissionProcessContext extends PipelineProcessContext { /** * Get pipeline channel creator. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgress.java similarity index 90% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgress.java index d0ffdb41df45a..3822b7ccfee44 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgress.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress; @@ -32,12 +32,12 @@ import java.util.Map; /** - * Inventory incremental job item progress. + * Transmission job item progress. */ @NoArgsConstructor @Getter @Setter -public final class InventoryIncrementalJobItemProgress implements PipelineJobItemProgress { +public final class TransmissionJobItemProgress implements PipelineJobItemProgress { private DatabaseType sourceDatabaseType; @@ -55,7 +55,7 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte private JobStatus status = JobStatus.RUNNING; - public InventoryIncrementalJobItemProgress(final InventoryIncrementalJobItemContext context) { + public TransmissionJobItemProgress(final TransmissionJobItemContext context) { sourceDatabaseType = context.getJobConfig().getSourceDatabaseType(); dataSourceName = context.getDataSourceName(); inventory = getInventoryTasksProgress(context.getInventoryTasks()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgress.java similarity index 89% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgress.java index 1e44dcf8984bb..0310749c882c8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgress.java @@ -22,11 +22,11 @@ import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; /** - * YAML inventory incremental job item progress. + * YAML transmission job item progress. */ @Getter @Setter -public final class YamlInventoryIncrementalJobItemProgress implements YamlPipelineJobItemProgressConfiguration { +public final class YamlTransmissionJobItemProgress implements YamlPipelineJobItemProgressConfiguration { private String status; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapper.java similarity index 76% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapper.java index 1b29d4f52de0d..f58abe18b1443 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapper.java @@ -18,23 +18,23 @@ package org.apache.shardingsphere.data.pipeline.common.job.progress.yaml; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; /** - * YAML inventory incremental job item progress swapper. + * YAML transmission job item progress swapper. */ -public final class YamlInventoryIncrementalJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper { +public final class YamlTransmissionJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper { private final YamlJobItemInventoryTasksProgressSwapper inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper(); private final YamlJobItemIncrementalTasksProgressSwapper incrementalTasksProgressSwapper = new YamlJobItemIncrementalTasksProgressSwapper(); @Override - public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) { - YamlInventoryIncrementalJobItemProgress result = new YamlInventoryIncrementalJobItemProgress(); + public YamlTransmissionJobItemProgress swapToYamlConfiguration(final TransmissionJobItemProgress progress) { + YamlTransmissionJobItemProgress result = new YamlTransmissionJobItemProgress(); result.setStatus(progress.getStatus().name()); result.setSourceDatabaseType(progress.getSourceDatabaseType().getType()); result.setDataSourceName(progress.getDataSourceName()); @@ -46,8 +46,8 @@ public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final Inv } @Override - public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncrementalJobItemProgress yamlProgress) { - InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); + public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) { + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setStatus(JobStatus.valueOf(yamlProgress.getStatus())); result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType())); result.setDataSourceName(yamlProgress.getDataSourceName()); @@ -59,7 +59,7 @@ public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncre } @Override - public Class getYamlProgressClass() { - return YamlInventoryIncrementalJobItemProgress.class; + public Class getYamlProgressClass() { + return YamlTransmissionJobItemProgress.class; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/InventoryIncrementalJobItemInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TransmissionJobItemInfo.java similarity index 86% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/InventoryIncrementalJobItemInfo.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TransmissionJobItemInfo.java index 440e09cf37c70..c9feb57ce9336 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/InventoryIncrementalJobItemInfo.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TransmissionJobItemInfo.java @@ -19,20 +19,20 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; /** - * Inventory incremental job item info. + * Transmission job item info. */ @RequiredArgsConstructor @Getter -public class InventoryIncrementalJobItemInfo { +public class TransmissionJobItemInfo { private final int shardingItem; private final String tableNames; - private final InventoryIncrementalJobItemProgress jobItemProgress; + private final TransmissionJobItemProgress jobItemProgress; private final long startTimeMillis; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java index ae07dba0ecbaa..e55cdb4d8cd6e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java @@ -21,7 +21,7 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import java.util.Collection; @@ -54,15 +54,15 @@ public static boolean isAllInventoryTasksFinished(final Collection * @param jobItemProgresses job item progresses * @return finished or not */ - public static boolean isInventoryFinished(final int jobShardingCount, final Collection jobItemProgresses) { + public static boolean isInventoryFinished(final int jobShardingCount, final Collection jobItemProgresses) { return isAllProgressesFilled(jobShardingCount, jobItemProgresses) && isAllInventoryTasksCompleted(jobItemProgresses); } - private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection jobItemProgresses) { + private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection jobItemProgresses) { return jobShardingCount == jobItemProgresses.size() && jobItemProgresses.stream().allMatch(Objects::nonNull); } - private static boolean isAllInventoryTasksCompleted(final Collection jobItemProgresses) { + private static boolean isAllInventoryTasksCompleted(final Collection jobItemProgresses) { if (jobItemProgresses.stream().allMatch(each -> each.getInventory().getProgresses().isEmpty())) { return false; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 1a11a902d52ca..b5414e71b5411 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -186,10 +186,10 @@ public void drop(final String jobId) { * @return jobs info */ public List getJobInfos(final PipelineContextKey contextKey) { - if (jobAPI instanceof InventoryIncrementalJobAPI) { + if (jobAPI instanceof TransmissionJobAPI) { return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream() .filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) - .map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); + .map(each -> ((TransmissionJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); } return Collections.emptyList(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java similarity index 85% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java index 76a66b00e6b60..7a9431b794e29 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java @@ -20,9 +20,9 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; @@ -31,14 +31,14 @@ import java.sql.SQLException; /** - * Inventory incremental job API. + * Transmission job API. */ -public interface InventoryIncrementalJobAPI extends PipelineJobAPI { +public interface TransmissionJobAPI extends PipelineJobAPI { @SuppressWarnings("unchecked") @Override - default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwapper() { - return new YamlInventoryIncrementalJobItemProgressSwapper(); + default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlTransmissionJobItemProgressSwapper(); } /** @@ -65,7 +65,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa * @param pipelineJobConfig pipeline job configuration * @return pipeline process context */ - InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); + TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); /** * Extend YAML job configuration. @@ -83,7 +83,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa * @param progressContext consistency check job item progress context * @return all logic tables check result */ - PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext, + PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java similarity index 77% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index afa4fa3e78a7d..cc3f9c464438f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -23,8 +23,8 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; @@ -39,12 +39,12 @@ import java.util.stream.IntStream; /** - * Inventory incremental job manager. + * Transmission job manager. */ @RequiredArgsConstructor -public final class InventoryIncrementalJobManager { +public final class TransmissionJobManager { - private final InventoryIncrementalJobAPI jobAPI; + private final TransmissionJobAPI jobAPI; private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -75,18 +75,18 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte * @param jobId job ID * @return job item infos */ - public List getJobItemInfos(final String jobId) { + public List getJobItemInfos(final String jobId) { PipelineJobConfiguration jobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(jobId); long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); - Map jobProgress = getJobProgress(jobConfig); - List result = new LinkedList<>(); + Map jobProgress = getJobProgress(jobConfig); + List result = new LinkedList<>(); PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId); - for (Entry entry : jobProgress.entrySet()) { + for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); - InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); + TransmissionJobItemProgress jobItemProgress = entry.getValue(); String errorMessage = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId, shardingItem); if (null == jobItemProgress) { - result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage)); + result.add(new TransmissionJobItemInfo(shardingItem, jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage)); continue; } int inventoryFinishedPercentage = 0; @@ -95,7 +95,7 @@ public List getJobItemInfos(final String jobId) } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) { inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount()); } - result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTableName(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage)); + result.add(new TransmissionJobItemInfo(shardingItem, jobInfo.getTableName(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage)); } return result; } @@ -106,12 +106,12 @@ public List getJobItemInfos(final String jobId) * @param jobConfig pipeline job configuration * @return each sharding item progress */ - public Map getJobProgress(final PipelineJobConfiguration jobConfig) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + public Map getJobProgress(final PipelineJobConfiguration jobConfig) { + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); String jobId = jobConfig.getJobId(); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> { - Optional jobItemProgress = jobItemManager.getProgress(jobId, each); + Optional jobItemProgress = jobItemManager.getProgress(jobId, each); jobItemProgress.ifPresent(optional -> optional.setActive(!jobConfigPOJO.isDisabled())); map.put(each, jobItemProgress.orElse(null)); }, LinkedHashMap::putAll); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 4a3eaa395cd79..225e98f8f8aaf 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -27,14 +27,14 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.common.util.IntervalToRangeIterator; @@ -78,10 +78,10 @@ public final class InventoryTaskSplitter { * @param jobItemContext job item context * @return split inventory data task */ - public List splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) { + public List splitInventoryData(final TransmissionJobItemContext jobItemContext) { List result = new LinkedList<>(); long startTimeMillis = System.currentTimeMillis(); - InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext(); + TransmissionProcessContext processContext = jobItemContext.getJobProcessContext(); for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) { AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); @@ -100,7 +100,7 @@ public List splitInventoryData(final InventoryIncrementalJobItemC * @param jobItemContext job item context * @return inventory dumper contexts */ - public Collection splitInventoryDumperContext(final InventoryIncrementalJobItemContext jobItemContext) { + public Collection splitInventoryDumperContext(final TransmissionJobItemContext jobItemContext) { Collection result = new LinkedList<>(); for (InventoryDumperContext each : splitByTable(dumperContext)) { result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource)); @@ -123,7 +123,7 @@ private Collection splitByTable(final InventoryDumperCon return result; } - private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, + private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (null == dumperContext.getUniqueKeyColumns()) { String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); @@ -132,7 +132,7 @@ private Collection splitByPrimaryKey(final InventoryDump dumperContext.setUniqueKeyColumns(uniqueKeyColumns); } Collection result = new LinkedList<>(); - InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext(); + TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext(); PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead(); int batchSize = readConfig.getBatchSize(); JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm(); @@ -153,9 +153,9 @@ private Collection splitByPrimaryKey(final InventoryDump return result; } - private Collection getInventoryPositions(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, + private Collection getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { - InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress(); + TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress(); if (null != initProgress) { // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center. Collection result = initProgress.getInventory().getInventoryPosition(dumperContext.getActualTableName()).values(); @@ -183,7 +183,7 @@ private Collection getInventoryPositions(final InventoryDumperCo } private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, - final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { + final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (0 == tableRecordsCount) { return Collections.singletonList(new IntegerPrimaryKeyPosition(0, 0)); } @@ -200,7 +200,7 @@ private Collection getPositionByIntegerUniqueKeyRange(final Inve return result; } - private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) { + private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) { String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index d49084862bd65..d62986557df89 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.channel.AckCallbacks; import org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress; import java.util.Optional; @@ -54,7 +54,7 @@ public static String generateInventoryTaskId(final InventoryDumperContext invent * @param initProgress initial job item progress * @return incremental task progress */ - public static IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress initProgress) { + public static IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final TransmissionJobItemProgress initProgress) { IncrementalTaskProgress result = new IncrementalTaskProgress(position); if (null != initProgress && null != initProgress.getIncremental()) { Optional.ofNullable(initProgress.getIncremental().getIncrementalTaskProgress()) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java similarity index 93% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java index be8da1323fe27..9571b6c093412 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java @@ -20,12 +20,12 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; @@ -43,14 +43,14 @@ import java.util.concurrent.CompletableFuture; /** - * Inventory incremental tasks' runner. + * Transmission tasks' runner. */ @RequiredArgsConstructor @Slf4j -public class InventoryIncrementalTasksRunner implements PipelineTasksRunner { +public class TransmissionTasksRunner implements PipelineTasksRunner { @Getter - private final InventoryIncrementalJobItemContext jobItemContext; + private final TransmissionJobItemContext jobItemContext; private final Collection inventoryTasks; @@ -60,9 +60,9 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner { private final PipelineJobManager jobManager; - private final PipelineJobItemManager jobItemManager; + private final PipelineJobItemManager jobItemManager; - public InventoryIncrementalTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) { + public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext) { this.jobItemContext = jobItemContext; inventoryTasks = jobItemContext.getInventoryTasks(); incrementalTasks = jobItemContext.getIncrementalTasks(); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgressTest.java similarity index 87% rename from kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgressTest.java index 9679fbad46ab8..2eee4fe05d79a 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/TransmissionJobItemProgressTest.java @@ -24,8 +24,8 @@ import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.test.util.ConfigurationFileUtils; @@ -41,13 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -class InventoryIncrementalJobItemProgressTest { +class TransmissionJobItemProgressTest { - private static final YamlInventoryIncrementalJobItemProgressSwapper SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper(); + private static final YamlTransmissionJobItemProgressSwapper SWAPPER = new YamlTransmissionJobItemProgressSwapper(); @Test void assertInit() { - InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); + TransmissionJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); assertThat(actual.getStatus(), is(JobStatus.RUNNING)); assertThat(actual.getSourceDatabaseType().getType(), is("H2")); assertThat(actual.getInventory().getProgresses().size(), is(4)); @@ -56,7 +56,7 @@ void assertInit() { @Test void assertGetIncrementalPosition() { - InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); + TransmissionJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); Optional position = actual.getIncremental().getIncrementalPosition(); assertTrue(position.isPresent()); assertThat(position.get(), instanceOf(PlaceholderPosition.class)); @@ -64,7 +64,7 @@ void assertGetIncrementalPosition() { @Test void assertGetInventoryPosition() { - InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); + TransmissionJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml")); assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds0.t_1#1"), instanceOf(FinishedPosition.class)); assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds1.t_1#1"), instanceOf(PlaceholderPosition.class)); assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds0.t_2#2"), instanceOf(FinishedPosition.class)); @@ -101,7 +101,7 @@ void assertGetProgressesCorrectly() { assertThat(orderItemPosition.get("ds.order_item#1"), instanceOf(UnsupportedKeyPosition.class)); } - private InventoryIncrementalJobItemProgress getJobItemProgress(final String data) { - return SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class)); + private TransmissionJobItemProgress getJobItemProgress(final String data) { + return SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlTransmissionJobItemProgress.class)); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapperTest.java similarity index 66% rename from kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapperTest.java index 31eac91f4f885..2a28152394de1 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlTransmissionJobItemProgressSwapperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.common.job.progress.yaml; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.test.util.ConfigurationFileUtils; import org.junit.jupiter.api.Test; @@ -26,14 +26,14 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; -class YamlInventoryIncrementalJobItemProgressSwapperTest { +class YamlTransmissionJobItemProgressSwapperTest { - private static final YamlInventoryIncrementalJobItemProgressSwapper SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper(); + private static final YamlTransmissionJobItemProgressSwapper SWAPPER = new YamlTransmissionJobItemProgressSwapper(); @Test void assertFullSwapToYamlConfiguration() { - InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress.yaml"), YamlInventoryIncrementalJobItemProgress.class)); - YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); + TransmissionJobItemProgress progress = SWAPPER.swapToObject(YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress.yaml"), YamlTransmissionJobItemProgress.class)); + YamlTransmissionJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); assertThat(actual.getStatus(), is("RUNNING")); assertThat(actual.getSourceDatabaseType(), is("H2")); assertThat(actual.getDataSourceName(), is("ds_0")); @@ -47,20 +47,20 @@ void assertFullSwapToYamlConfiguration() { @Test void assertSwapWithFullConfig() { - YamlInventoryIncrementalJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress.yaml"), YamlInventoryIncrementalJobItemProgress.class); - YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(SWAPPER.swapToObject(yamlProgress)); + YamlTransmissionJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress.yaml"), YamlTransmissionJobItemProgress.class); + YamlTransmissionJobItemProgress actual = SWAPPER.swapToYamlConfiguration(SWAPPER.swapToObject(yamlProgress)); assertThat(YamlEngine.marshal(actual), is(YamlEngine.marshal(yamlProgress))); } @Test void assertSwapWithoutInventoryIncremental() { - YamlInventoryIncrementalJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-failure.yaml"), YamlInventoryIncrementalJobItemProgress.class); - InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress); + YamlTransmissionJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-failure.yaml"), YamlTransmissionJobItemProgress.class); + TransmissionJobItemProgress progress = SWAPPER.swapToObject(yamlProgress); assertNotNull(progress.getInventory()); assertNotNull(progress.getIncremental()); assertThat(progress.getDataSourceName(), is("ds_0")); assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L)); - YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); + YamlTransmissionJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); assertNotNull(actual.getInventory()); assertNotNull(actual.getIncremental()); assertThat(YamlEngine.marshal(actual), is(YamlEngine.marshal(yamlProgress))); @@ -68,13 +68,13 @@ void assertSwapWithoutInventoryIncremental() { @Test void assertSwapWithRunningConfig() { - YamlInventoryIncrementalJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-running.yaml"), YamlInventoryIncrementalJobItemProgress.class); - InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress); + YamlTransmissionJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-running.yaml"), YamlTransmissionJobItemProgress.class); + TransmissionJobItemProgress progress = SWAPPER.swapToObject(yamlProgress); assertNotNull(progress.getInventory()); assertNotNull(progress.getIncremental()); assertThat(progress.getDataSourceName(), is("ds_0")); assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L)); - YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); + YamlTransmissionJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); assertNotNull(actual.getInventory()); assertNotNull(actual.getIncremental()); assertThat(YamlEngine.marshal(actual), is(YamlEngine.marshal(yamlProgress))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java index 60bed768d2257..287fb6fbde8cf 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java @@ -19,10 +19,10 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -41,14 +41,14 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowStreamingStatusStatement sqlStatement) { - InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType()); - List jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); + TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType()); + List jobItemInfos = new TransmissionJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } - private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) { - InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress(); + private LocalDataQueryResultRow generateResultRow(final TransmissionJobItemInfo jobItemInfo, final long currentTimeMillis) { + TransmissionJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress(); if (null == jobItemProgress) { return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index 78a02f6ea1f9c..a043e01e70470 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -20,8 +20,8 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java index 79684dc0eb1f4..4a1ecdf49893d 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java @@ -17,10 +17,10 @@ package org.apache.shardingsphere.migration.distsql.handler.query; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -40,14 +40,14 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowMigrationStatusStatement sqlStatement) { - InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"); - List jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); + TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"); + List jobItemInfos = new TransmissionJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } - private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) { - InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress(); + private LocalDataQueryResultRow generateResultRow(final TransmissionJobItemInfo jobItemInfo, final long currentTimeMillis) { + TransmissionJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress(); if (null == jobItemProgress) { return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", "", jobItemInfo.getErrorMessage()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 1bf18932eccfe..6f72afb054f5a 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -54,8 +54,8 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme } private void verifyInventoryFinished(final MigrationJobConfiguration jobConfig) { - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(migrationJobAPI); - ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), inventoryIncrementalJobManager.getJobProgress(jobConfig).values()), + TransmissionJobManager transmissionJobManager = new TransmissionJobManager(migrationJobAPI); + ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), transmissionJobManager.getJobProgress(jobConfig).values()), () -> new PipelineInvalidParameterException("Inventory is not finished.")); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java index d211d6919051b..f8bea1788cfc6 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -32,7 +32,7 @@ public final class CommitMigrationUpdater implements RALUpdater jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); try (PipelineDataSourceManager pipelineDataSourceManager = new DefaultPipelineDataSourceManager()) { for (int i = 0; i < jobConfig.getJobShardingCount(); i++) { if (jobItemManager.getProgress(jobId, i).isPresent()) { continue; } IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); - InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); + TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist( jobId, i, YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } @@ -185,10 +185,10 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { } } - private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig, - final PipelineDataSourceManager dataSourceManager, - final IncrementalDumperContext incrementalDumperContext) throws SQLException { - InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); + private static TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, + final PipelineDataSourceManager dataSourceManager, + final IncrementalDumperContext incrementalDumperContext) throws SQLException { + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); @@ -278,7 +278,7 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati @Override public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - InventoryIncrementalJobManager jobManager = new InventoryIncrementalJobManager(this); + TransmissionJobManager jobManager = new TransmissionJobManager(this); return new CDCProcessContext(pipelineJobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()))); } @@ -325,7 +325,7 @@ public void rollback(final String jobId) throws SQLException { } @Override - public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext, + public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { throw new UnsupportedOperationException(); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index 1657703287700..d07281e456b54 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -24,11 +24,11 @@ import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; @@ -44,7 +44,7 @@ * CDC job item context. */ @Getter -public final class CDCJobItemContext implements InventoryIncrementalJobItemContext { +public final class CDCJobItemContext implements TransmissionJobItemContext { private final CDCJobConfiguration jobConfig; @@ -56,7 +56,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte @Setter private volatile JobStatus status = JobStatus.RUNNING; - private final InventoryIncrementalJobItemProgress initProgress; + private final TransmissionJobItemProgress initProgress; private final CDCProcessContext jobProcessContext; @@ -90,7 +90,7 @@ protected PipelineTableMetaDataLoader initialize() throws ConcurrentException { } }; - public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress, final CDCProcessContext jobProcessContext, + public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final CDCProcessContext jobProcessContext, final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final PipelineSink sink) { this.jobConfig = jobConfig; this.shardingItem = shardingItem; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java index 57b3a3eb4e79d..857106e90fa95 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java @@ -18,12 +18,12 @@ package org.apache.shardingsphere.data.pipeline.cdc.context; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.AbstractInventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext; /** * CDC process context. */ -public final class CDCProcessContext extends AbstractInventoryIncrementalProcessContext { +public final class CDCProcessContext extends AbstractTransmissionProcessContext { public CDCProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) { super(jobId, originalProcessConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 62250972e4b20..31cde63332dfe 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -37,7 +37,7 @@ import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; @@ -67,7 +67,7 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob { private final CDCJobAPI jobAPI = new CDCJobAPI(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); private final CDCJobPreparer jobPreparer = new CDCJobPreparer(); @@ -108,7 +108,7 @@ public void execute(final ShardingContext shardingContext) { } private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { - Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); + Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig); CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index b39a064745818..ad9a1942b582b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator; import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress; @@ -69,7 +69,7 @@ public final class CDCJobPreparer { private final CDCJobAPI jobAPI = new CDCJobAPI(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); /** * Do prepare work. @@ -89,7 +89,7 @@ public void initTasks(final Collection jobItemContexts) { private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List inventoryChannelProgressPairs, final AtomicBoolean incrementalImporterUsed, final List incrementalChannelProgressPairs) { - Optional jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); + Optional jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); if (!jobItemProgress.isPresent()) { jobItemManager.persistProgress(jobItemContext); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java index 8d5d62a743127..edb08f2bd2a5f 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.task; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.infra.util.close.QuietlyCloser; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -30,13 +30,13 @@ */ public final class CDCTasksRunner implements PipelineTasksRunner { - private final InventoryIncrementalJobItemContext jobItemContext; + private final TransmissionJobItemContext jobItemContext; private final Collection inventoryTasks; private final Collection incrementalTasks; - public CDCTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) { + public CDCTasksRunner(final TransmissionJobItemContext jobItemContext) { this.jobItemContext = jobItemContext; inventoryTasks = jobItemContext.getInventoryTasks(); incrementalTasks = jobItemContext.getIncrementalTasks(); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 27a54b2e22b02..58db6fc27c995 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -25,12 +25,12 @@ import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; @@ -56,7 +56,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); @Getter private final ConsistencyCheckJobItemContext jobItemContext; @@ -102,7 +102,7 @@ private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifec protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); - InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); + TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId); try { PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker( diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 439cc36ad8168..fd958c358956f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -18,14 +18,14 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.task.runner.InventoryIncrementalTasksRunner; +import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -47,7 +47,7 @@ public final class MigrationJob extends AbstractSimplePipelineJob { private final MigrationJobAPI jobAPI = new MigrationJobAPI(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); @@ -59,10 +59,10 @@ public MigrationJob(final String jobId) { } @Override - protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) { + protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) { int shardingItem = shardingContext.getShardingItem(); MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); - Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); + Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig); MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); @@ -70,7 +70,7 @@ protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final S @Override protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) { - return new InventoryIncrementalTasksRunner((InventoryIncrementalJobItemContext) pipelineJobItemContext); + return new TransmissionTasksRunner((TransmissionJobItemContext) pipelineJobItemContext); } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index e7e4f0bf06ee3..6d9eb9054234e 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; @@ -53,8 +53,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; @@ -109,7 +109,7 @@ * Migration job API. */ @Slf4j -public final class MigrationJobAPI implements InventoryIncrementalJobAPI { +public final class MigrationJobAPI implements TransmissionJobAPI { private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); @@ -276,12 +276,12 @@ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfi @Override public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())); + PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())); return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig); } @Override - public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext, + public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, progressContext); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index c189075c554b0..a8fa521a1a3dc 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -22,14 +22,14 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataUtils; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; @@ -41,7 +41,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; @@ -72,7 +72,7 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis private final AtomicReference currentTableInventoryChecker = new AtomicReference<>(); - public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext, + public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { this.jobConfig = jobConfig; readRateLimitAlgorithm = null == processContext ? null : processContext.getReadRateLimitAlgorithm(); @@ -99,8 +99,8 @@ public Map check(final String algorithm } private long getRecordsCount() { - Map jobProgress = new InventoryIncrementalJobManager(new MigrationJobAPI()).getJobProgress(jobConfig); - return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum(); + Map jobProgress = new TransmissionJobManager(new MigrationJobAPI()).getJobProgress(jobConfig); + return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum(); } private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker, diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java index 3b504d3e81943..dbffa7cb40919 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java @@ -23,11 +23,11 @@ import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink; @@ -46,7 +46,7 @@ */ @Getter @Setter -public final class MigrationJobItemContext implements InventoryIncrementalJobItemContext { +public final class MigrationJobItemContext implements TransmissionJobItemContext { private final String jobId; @@ -58,7 +58,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte private volatile JobStatus status = JobStatus.RUNNING; - private final InventoryIncrementalJobItemProgress initProgress; + private final TransmissionJobItemProgress initProgress; private final MigrationTaskConfiguration taskConfig; @@ -92,7 +92,7 @@ protected PipelineTableMetaDataLoader initialize() throws ConcurrentException { } }; - public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress, + public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final MigrationProcessContext jobProcessContext, final MigrationTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) { this.jobConfig = jobConfig; jobId = jobConfig.getJobId(); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java index b3ab0a79d5983..a382a4ca5cc64 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java @@ -19,13 +19,13 @@ import lombok.Getter; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.AbstractInventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext; /** * Migration process context. */ @Getter -public final class MigrationProcessContext extends AbstractInventoryIncrementalProcessContext { +public final class MigrationProcessContext extends AbstractTransmissionProcessContext { /** * Constructor. diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 26918b29f9b3f..6bf6e1c075b7f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener; @@ -83,7 +83,7 @@ public final class MigrationJobPreparer { private final MigrationJobAPI jobAPI = new MigrationJobAPI(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); /** * Do prepare work. @@ -154,7 +154,7 @@ private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) if (jobItemContext.isSourceTargetDatabaseTheSame()) { prepareTarget(jobItemContext); } - InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress(); + TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress(); if (null == initProgress) { PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); PipelineJobPreparerUtils.checkTargetDataSource( diff --git a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java index d0e2f69f7e01b..8c74c757c3c99 100644 --- a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java +++ b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java @@ -71,7 +71,7 @@ import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; import org.apache.shardingsphere.distsql.segment.DataSourceSegment; import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment; -import org.apache.shardingsphere.distsql.segment.InventoryIncrementalRuleSegment; +import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment; import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment; import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment; import org.apache.shardingsphere.distsql.statement.ral.queryable.ConvertYamlConfigurationStatement; @@ -86,7 +86,7 @@ import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement; import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowTableMetaDataStatement; import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterComputeNodeStatement; -import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement; +import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.distsql.statement.ral.updatable.ImportDatabaseConfigurationStatement; import org.apache.shardingsphere.distsql.statement.ral.updatable.ImportMetaDataStatement; import org.apache.shardingsphere.distsql.statement.ral.updatable.LabelComputeNodeStatement; @@ -312,14 +312,14 @@ public ASTNode visitShowMigrationRule(final ShowMigrationRuleContext ctx) { @Override public ASTNode visitAlterMigrationRule(final AlterMigrationRuleContext ctx) { - InventoryIncrementalRuleSegment segment = null == ctx.inventoryIncrementalRule() ? null - : (InventoryIncrementalRuleSegment) visit(ctx.inventoryIncrementalRule()); - return new AlterInventoryIncrementalRuleStatement("MIGRATION", segment); + TransmissionRuleSegment segment = null == ctx.inventoryIncrementalRule() ? null + : (TransmissionRuleSegment) visit(ctx.inventoryIncrementalRule()); + return new AlterTransmissionRuleStatement("MIGRATION", segment); } @Override public ASTNode visitInventoryIncrementalRule(final InventoryIncrementalRuleContext ctx) { - InventoryIncrementalRuleSegment result = new InventoryIncrementalRuleSegment(); + TransmissionRuleSegment result = new TransmissionRuleSegment(); if (null != ctx.readDefinition()) { result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition())); } diff --git a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/InventoryIncrementalRuleSegment.java b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/TransmissionRuleSegment.java similarity index 91% rename from parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/InventoryIncrementalRuleSegment.java rename to parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/TransmissionRuleSegment.java index 287097c4c8566..79abb764fd15e 100644 --- a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/InventoryIncrementalRuleSegment.java +++ b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/TransmissionRuleSegment.java @@ -22,11 +22,11 @@ import org.apache.shardingsphere.sql.parser.api.ASTNode; /** - * Inventory incremental rule segment. + * Transmission rule segment. */ @Getter @Setter -public final class InventoryIncrementalRuleSegment implements ASTNode { +public final class TransmissionRuleSegment implements ASTNode { private ReadOrWriteSegment readSegment; diff --git a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterInventoryIncrementalRuleStatement.java b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterTransmissionRuleStatement.java similarity index 79% rename from parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterInventoryIncrementalRuleStatement.java rename to parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterTransmissionRuleStatement.java index 96d7836aa7c62..3a6a7fcf1b868 100644 --- a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterInventoryIncrementalRuleStatement.java +++ b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/statement/ral/updatable/AlterTransmissionRuleStatement.java @@ -19,17 +19,17 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.distsql.segment.InventoryIncrementalRuleSegment; +import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment; import org.apache.shardingsphere.distsql.statement.ral.pipeline.migration.UpdatableMigrationRALStatement; /** - * Alter inventory incremental rule statement. + * Alter transmission rule statement. */ @RequiredArgsConstructor @Getter -public final class AlterInventoryIncrementalRuleStatement extends UpdatableMigrationRALStatement { +public final class AlterTransmissionRuleStatement extends UpdatableMigrationRALStatement { private final String jobTypeName; - private final InventoryIncrementalRuleSegment processConfigSegment; + private final TransmissionRuleSegment processConfigSegment; } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index d9e5391261367..0f3179f15644e 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -19,8 +19,8 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement; @@ -40,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java similarity index 67% rename from proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java rename to proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java index 52648f2cc95c3..985278f2f603f 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java @@ -19,29 +19,29 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; -import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement; +import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.converter.InventoryIncrementalProcessConfigurationSegmentConverter; +import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.converter.TransmissionProcessConfigurationSegmentConverter; /** - * Alter inventory incremental rule updater. + * Alter transmission rule updater. */ -public final class AlterInventoryIncrementalRuleUpdater implements RALUpdater { +public final class AlterTransmissionRuleUpdater implements RALUpdater { @Override - public void executeUpdate(final String databaseName, final AlterInventoryIncrementalRuleStatement sqlStatement) { - InventoryIncrementalJobManager jobManager = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName())); - PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment()); + public void executeUpdate(final String databaseName, final AlterTransmissionRuleStatement sqlStatement) { + TransmissionJobManager jobManager = new TransmissionJobManager((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName())); + PipelineProcessConfiguration processConfig = TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment()); jobManager.alterProcessConfiguration(new PipelineContextKey(InstanceType.PROXY), processConfig); } @Override - public Class getType() { - return AlterInventoryIncrementalRuleStatement.class; + public Class getType() { + return AlterTransmissionRuleStatement.class; } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/InventoryIncrementalProcessConfigurationSegmentConverter.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/TransmissionProcessConfigurationSegmentConverter.java similarity index 90% rename from proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/InventoryIncrementalProcessConfigurationSegmentConverter.java rename to proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/TransmissionProcessConfigurationSegmentConverter.java index e6c07cfd84e6a..298e0727484bf 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/InventoryIncrementalProcessConfigurationSegmentConverter.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/converter/TransmissionProcessConfigurationSegmentConverter.java @@ -23,15 +23,15 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; -import org.apache.shardingsphere.distsql.segment.InventoryIncrementalRuleSegment; +import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment; import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment; import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration; /** - * Inventory incremental process configuration segment converter. + * Transmission process configuration segment converter. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class InventoryIncrementalProcessConfigurationSegmentConverter { +public final class TransmissionProcessConfigurationSegmentConverter { /** * Convert to pipeline process configuration. @@ -39,7 +39,7 @@ public final class InventoryIncrementalProcessConfigurationSegmentConverter { * @param segment process configuration segment * @return pipeline process configuration */ - public static PipelineProcessConfiguration convert(final InventoryIncrementalRuleSegment segment) { + public static PipelineProcessConfiguration convert(final TransmissionRuleSegment segment) { PipelineReadConfiguration readConfig = convertToReadConfiguration(segment.getReadSegment()); PipelineWriteConfiguration writeConfig = convertToWriteConfiguration(segment.getWriteSegment()); AlgorithmConfiguration streamChannel = convertToAlgorithm(segment.getStreamChannel()); diff --git a/proxy/backend/core/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater b/proxy/backend/core/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater index a0fccd573c76f..08d818b269f44 100644 --- a/proxy/backend/core/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater +++ b/proxy/backend/core/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater @@ -15,7 +15,7 @@ # limitations under the License. # -org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.AlterInventoryIncrementalRuleUpdater +org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.AlterTransmissionRuleUpdater org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.LabelComputeNodeUpdater org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.UnlabelComputeNodeUpdater org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.SetInstanceStatusUpdater diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterTransmissionRuleStatementAssert.java similarity index 92% rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterTransmissionRuleStatementAssert.java index 7b05ef298a7e0..ea137249f5772 100644 --- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java +++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterTransmissionRuleStatementAssert.java @@ -21,9 +21,9 @@ import lombok.NoArgsConstructor; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment; -import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement; +import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext; -import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase; +import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterTransmissionRuleStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedRead; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedWrite; @@ -34,10 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertNull; /** - * Alter inventory incremental rule statement assert. + * Alter transmission rule statement assert. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class AlterInventoryIncrementalRuleStatementAssert { +public class AlterTransmissionRuleStatementAssert { /** * Assert statement is correct with expected parser result. @@ -46,7 +46,7 @@ public class AlterInventoryIncrementalRuleStatementAssert { * @param actual actual statement * @param expected expected statement test case */ - public static void assertIs(final SQLCaseAssertContext assertContext, final AlterInventoryIncrementalRuleStatement actual, final AlterInventoryIncrementalRuleStatementTestCase expected) { + public static void assertIs(final SQLCaseAssertContext assertContext, final AlterTransmissionRuleStatement actual, final AlterTransmissionRuleStatementTestCase expected) { if (null == expected) { assertNull(actual, assertContext.getText("Actual statement should not exist.")); } else { diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java index d6dfa98ceb65f..2e8d73867d193 100644 --- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java +++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java @@ -21,7 +21,7 @@ import lombok.NoArgsConstructor; import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement; import org.apache.shardingsphere.distsql.statement.ral.pipeline.UpdatablePipelineRALStatement; -import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement; +import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement; import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement; import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement; @@ -44,7 +44,7 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationCheckStatementAssert; import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert; import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert; -import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase; +import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterTransmissionRuleStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase; @@ -95,8 +95,8 @@ public static void assertIs(final SQLCaseAssertContext assertContext, final Upda StartMigrationCheckStatementAssert.assertIs(assertContext, (StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) expected); } else if (actual instanceof StopMigrationCheckStatement) { StopMigrationCheckStatementAssert.assertIs(assertContext, (StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) expected); - } else if (actual instanceof AlterInventoryIncrementalRuleStatement) { - AlterInventoryIncrementalRuleStatementAssert.assertIs(assertContext, (AlterInventoryIncrementalRuleStatement) actual, (AlterInventoryIncrementalRuleStatementTestCase) expected); + } else if (actual instanceof AlterTransmissionRuleStatement) { + AlterTransmissionRuleStatementAssert.assertIs(assertContext, (AlterTransmissionRuleStatement) actual, (AlterTransmissionRuleStatementTestCase) expected); } else if (actual instanceof DropStreamingStatement) { DropStreamingStatementAssert.assertIs(assertContext, (DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected); } diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterTransmissionRuleStatementTestCase.java similarity index 83% rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterTransmissionRuleStatementTestCase.java index 2506f0f11e965..c6c42fbbb2154 100644 --- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java +++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterTransmissionRuleStatementTestCase.java @@ -20,23 +20,23 @@ import lombok.Getter; import lombok.Setter; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase; -import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedInventoryIncrementalRule; +import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedTransmissionRule; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; /** - * Alter inventory incremental rule statement test case. + * Alter transmission rule statement test case. */ @Getter @Setter @XmlAccessorType(XmlAccessType.FIELD) -public final class AlterInventoryIncrementalRuleStatementTestCase extends SQLParserTestCase { +public final class AlterTransmissionRuleStatementTestCase extends SQLParserTestCase { @XmlElement(name = "job-type-name") private String jobTypeName; @XmlElement(name = "rule") - private ExpectedInventoryIncrementalRule rule; + private ExpectedTransmissionRule rule; } diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java index cecb97b0cdfac..8406f46775be5 100644 --- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java +++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java @@ -20,7 +20,7 @@ import com.google.common.base.Preconditions; import lombok.Getter; import lombok.SneakyThrows; -import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase; +import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterTransmissionRuleStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.CommonStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.AlterResourceGroupStatementTestCase; import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.BinlogStatementTestCase; @@ -1053,7 +1053,7 @@ public final class RootSQLParserTestCases { private final List showStreamingRuleTestCases = new LinkedList<>(); @XmlElement(name = "alter-streaming-rule") - private final List alterStreamingRuleTestCases = new LinkedList<>(); + private final List alterStreamingRuleTestCases = new LinkedList<>(); @XmlElement(name = "show-streaming-list") private final List showStreamingListTestCases = new LinkedList<>(); diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedTransmissionRule.java similarity index 94% rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedTransmissionRule.java index 7c33deccd80d1..78b8d7567d95f 100644 --- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java +++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedTransmissionRule.java @@ -26,12 +26,12 @@ import javax.xml.bind.annotation.XmlElement; /** - * Expected inventory incremental rule. + * Expected transmission rule. */ @Getter @Setter @XmlAccessorType(XmlAccessType.FIELD) -public final class ExpectedInventoryIncrementalRule { +public final class ExpectedTransmissionRule { @XmlElement(name = "read") private ExpectedRead read; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTransmissionJobItemContext.java similarity index 86% rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTransmissionJobItemContext.java index 9892aeb47ec04..58948a11ed40d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTransmissionJobItemContext.java @@ -19,24 +19,24 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; -import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import java.util.Collection; -public final class FixtureInventoryIncrementalJobItemContext implements InventoryIncrementalJobItemContext { +public final class FixtureTransmissionJobItemContext implements TransmissionJobItemContext { @Override public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) { } @Override - public InventoryIncrementalProcessContext getJobProcessContext() { + public TransmissionProcessContext getJobProcessContext() { return null; } @@ -51,7 +51,7 @@ public Collection getIncrementalTasks() { } @Override - public InventoryIncrementalJobItemProgress getInitProgress() { + public TransmissionJobItemProgress getInitProgress() { return null; } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index 7908c1a1f2818..70c5fbdfa7032 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -35,7 +35,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext; +import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureTransmissionJobItemContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -88,7 +88,7 @@ class PipelineDataSourceSinkTest { @BeforeEach void setUp() throws SQLException { PipelineSink pipelineSink = new PipelineDataSourceSink(mockImporterConfiguration(), dataSourceManager); - importer = new SingleChannelConsumerImporter(channel, 100, 1, TimeUnit.SECONDS, pipelineSink, new FixtureInventoryIncrementalJobItemContext()); + importer = new SingleChannelConsumerImporter(channel, 100, 1, TimeUnit.SECONDS, pipelineSink, new FixtureTransmissionJobItemContext()); when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource); when(dataSource.getConnection()).thenReturn(connection); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java index 9c3e236f37ffb..e55320508860b 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java @@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -96,7 +96,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( result.setSources(sources); result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration( ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}", databaseNameSuffix)))); - ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")).extendYamlJobConfiguration(contextKey, result); + ((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")).extendYamlJobConfiguration(contextKey, result); return result; } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index b0ff58bbfaf99..1fde78da7f556 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencycheck.api.impl; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -54,7 +54,7 @@ class ConsistencyCheckJobAPITest { private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new YamlMigrationJobConfigurationSwapper(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index be69e9dd0eb66..d9fa308b494da 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -24,15 +24,15 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; @@ -92,9 +92,9 @@ class MigrationJobAPITest { private static PipelineJobManager jobManager; - private static InventoryIncrementalJobManager inventoryIncrementalJobManager; + private static TransmissionJobManager transmissionJobManager; - private static PipelineJobItemManager jobItemManager; + private static PipelineJobItemManager jobItemManager; private static DatabaseType databaseType; @@ -103,7 +103,7 @@ static void beforeClass() { PipelineContextUtils.mockModeConfigAndContextManager(); jobAPI = new MigrationJobAPI(); jobManager = new PipelineJobManager(jobAPI); - inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); + transmissionJobManager = new TransmissionJobManager(jobAPI); jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"; databaseType = DatabaseTypeFactory.get(jdbcUrl); @@ -174,7 +174,7 @@ void assertGetProgress() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); Optional jobId = jobManager.start(jobConfig); assertTrue(jobId.isPresent()); - Map jobProgressMap = inventoryIncrementalJobManager.getJobProgress(jobConfig); + Map jobProgressMap = transmissionJobManager.getJobProgress(jobConfig); assertThat(jobProgressMap.size(), is(1)); } @@ -200,8 +200,8 @@ void assertSwitchClusterConfigurationSucceed() { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); jobItemManager.updateStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); - Map progress = inventoryIncrementalJobManager.getJobProgress(jobConfig); - for (Entry entry : progress.entrySet()) { + Map progress = transmissionJobManager.getJobProgress(jobConfig); + for (Entry entry : progress.entrySet()) { assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK)); } } @@ -233,7 +233,7 @@ void assertRenewJobStatus() { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); jobItemManager.updateStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); - Optional actual = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); + Optional actual = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); assertTrue(actual.isPresent()); assertThat(actual.get().getStatus(), is(JobStatus.FINISHED)); } @@ -300,13 +300,13 @@ void assertShowMigrationSourceResources() { void assertGetJobItemInfosAtBegin() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress(); + YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setStatus(JobStatus.RUNNING.name()); yamlJobItemProgress.setSourceDatabaseType("MySQL"); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); assertThat(jobItemInfos.size(), is(1)); - InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); + TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING)); assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(0)); } @@ -315,14 +315,14 @@ void assertGetJobItemInfosAtBegin() { void assertGetJobItemInfosAtIncrementTask() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress(); + YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setSourceDatabaseType("MySQL"); yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name()); yamlJobItemProgress.setProcessedRecordsCount(100); yamlJobItemProgress.setInventoryRecordsCount(50); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); - InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); + TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100)); }