Skip to content

Commit

Permalink
Rename InventoryIncrementalJob to TransmissionJob (#29147)
Browse files Browse the repository at this point in the history
* Rename InventoryIncrementalJob to TransmissionJob

* Rename InventoryIncrementalJob to TransmissionJob
  • Loading branch information
terrymanu authored Nov 24, 2023
1 parent bdedd6b commit da8368c
Show file tree
Hide file tree
Showing 53 changed files with 274 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

/**
* Abstract inventory incremental process context.
* Abstract transmission process context.
*/
@Getter
public abstract class AbstractInventoryIncrementalProcessContext implements InventoryIncrementalProcessContext {
public abstract class AbstractTransmissionProcessContext implements TransmissionProcessContext {

private final PipelineProcessConfiguration pipelineProcessConfig;

Expand All @@ -51,7 +51,7 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve

private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;

protected AbstractInventoryIncrementalProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
protected AbstractTransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
Expand All @@ -26,12 +26,12 @@
import java.util.Collection;

/**
* Inventory incremental job item context.
* Transmission job item context.
*/
public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {
public interface TransmissionJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {

@Override
InventoryIncrementalProcessContext getJobProcessContext();
TransmissionProcessContext getJobProcessContext();

/**
* Get inventory tasks.
Expand All @@ -52,7 +52,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
*
* @return init progress
*/
InventoryIncrementalJobItemProgress getInitProgress();
TransmissionJobItemProgress getInitProgress();

/**
* Get source meta data loader.
Expand Down Expand Up @@ -90,7 +90,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
long getInventoryRecordsCount();

@Override
default InventoryIncrementalJobItemProgress toProgress() {
return new InventoryIncrementalJobItemProgress(this);
default TransmissionJobItemProgress toProgress() {
return new TransmissionJobItemProgress(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;

/**
* Inventory incremental process context.
* Transmission process context.
*/
public interface InventoryIncrementalProcessContext extends PipelineProcessContext {
public interface TransmissionProcessContext extends PipelineProcessContext {

/**
* Get pipeline channel creator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
Expand All @@ -32,12 +32,12 @@
import java.util.Map;

/**
* Inventory incremental job item progress.
* Transmission job item progress.
*/
@NoArgsConstructor
@Getter
@Setter
public final class InventoryIncrementalJobItemProgress implements PipelineJobItemProgress {
public final class TransmissionJobItemProgress implements PipelineJobItemProgress {

private DatabaseType sourceDatabaseType;

Expand All @@ -55,7 +55,7 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte

private JobStatus status = JobStatus.RUNNING;

public InventoryIncrementalJobItemProgress(final InventoryIncrementalJobItemContext context) {
public TransmissionJobItemProgress(final TransmissionJobItemContext context) {
sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
dataSourceName = context.getDataSourceName();
inventory = getInventoryTasksProgress(context.getInventoryTasks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;

/**
* YAML inventory incremental job item progress.
* YAML transmission job item progress.
*/
@Getter
@Setter
public final class YamlInventoryIncrementalJobItemProgress implements YamlPipelineJobItemProgressConfiguration {
public final class YamlTransmissionJobItemProgress implements YamlPipelineJobItemProgressConfiguration {

private String status;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
package org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;

import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

/**
* YAML inventory incremental job item progress swapper.
* YAML transmission job item progress swapper.
*/
public final class YamlInventoryIncrementalJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper<YamlInventoryIncrementalJobItemProgress, InventoryIncrementalJobItemProgress> {
public final class YamlTransmissionJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper<YamlTransmissionJobItemProgress, TransmissionJobItemProgress> {

private final YamlJobItemInventoryTasksProgressSwapper inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper();

private final YamlJobItemIncrementalTasksProgressSwapper incrementalTasksProgressSwapper = new YamlJobItemIncrementalTasksProgressSwapper();

@Override
public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) {
YamlInventoryIncrementalJobItemProgress result = new YamlInventoryIncrementalJobItemProgress();
public YamlTransmissionJobItemProgress swapToYamlConfiguration(final TransmissionJobItemProgress progress) {
YamlTransmissionJobItemProgress result = new YamlTransmissionJobItemProgress();
result.setStatus(progress.getStatus().name());
result.setSourceDatabaseType(progress.getSourceDatabaseType().getType());
result.setDataSourceName(progress.getDataSourceName());
Expand All @@ -46,8 +46,8 @@ public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final Inv
}

@Override
public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncrementalJobItemProgress yamlProgress) {
InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress();
public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()));
result.setDataSourceName(yamlProgress.getDataSourceName());
Expand All @@ -59,7 +59,7 @@ public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncre
}

@Override
public Class<YamlInventoryIncrementalJobItemProgress> getYamlProgressClass() {
return YamlInventoryIncrementalJobItemProgress.class;
public Class<YamlTransmissionJobItemProgress> getYamlProgressClass() {
return YamlTransmissionJobItemProgress.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;

/**
* Inventory incremental job item info.
* Transmission job item info.
*/
@RequiredArgsConstructor
@Getter
public class InventoryIncrementalJobItemInfo {
public class TransmissionJobItemInfo {

private final int shardingItem;

private final String tableNames;

private final InventoryIncrementalJobItemProgress jobItemProgress;
private final TransmissionJobItemProgress jobItemProgress;

private final long startTimeMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;

import java.util.Collection;
Expand Down Expand Up @@ -54,15 +54,15 @@ public static boolean isAllInventoryTasksFinished(final Collection<PipelineTask>
* @param jobItemProgresses job item progresses
* @return finished or not
*/
public static boolean isInventoryFinished(final int jobShardingCount, final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
public static boolean isInventoryFinished(final int jobShardingCount, final Collection<TransmissionJobItemProgress> jobItemProgresses) {
return isAllProgressesFilled(jobShardingCount, jobItemProgresses) && isAllInventoryTasksCompleted(jobItemProgresses);
}

private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<TransmissionJobItemProgress> jobItemProgresses) {
return jobShardingCount == jobItemProgresses.size() && jobItemProgresses.stream().allMatch(Objects::nonNull);
}

private static boolean isAllInventoryTasksCompleted(final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
private static boolean isAllInventoryTasksCompleted(final Collection<TransmissionJobItemProgress> jobItemProgresses) {
if (jobItemProgresses.stream().allMatch(each -> each.getInventory().getProgresses().isEmpty())) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ public void drop(final String jobId) {
* @return jobs info
*/
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
if (jobAPI instanceof InventoryIncrementalJobAPI) {
if (jobAPI instanceof TransmissionJobAPI) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
.filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
.map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
.map(each -> ((TransmissionJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
Expand All @@ -31,14 +31,14 @@
import java.sql.SQLException;

/**
* Inventory incremental job API.
* Transmission job API.
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
public interface TransmissionJobAPI extends PipelineJobAPI {

@SuppressWarnings("unchecked")
@Override
default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlInventoryIncrementalJobItemProgressSwapper();
default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlTransmissionJobItemProgressSwapper();
}

/**
Expand All @@ -65,7 +65,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
* Extend YAML job configuration.
Expand All @@ -83,7 +83,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext,
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);

/**
Expand Down
Loading

0 comments on commit da8368c

Please sign in to comment.