Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename InventoryIncrementalJob to TransmissionJob #29147

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

/**
* Abstract inventory incremental process context.
* Abstract transmission process context.
*/
@Getter
public abstract class AbstractInventoryIncrementalProcessContext implements InventoryIncrementalProcessContext {
public abstract class AbstractTransmissionProcessContext implements TransmissionProcessContext {

private final PipelineProcessConfiguration pipelineProcessConfig;

Expand All @@ -51,7 +51,7 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve

private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;

protected AbstractInventoryIncrementalProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
protected AbstractTransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
Expand All @@ -26,12 +26,12 @@
import java.util.Collection;

/**
* Inventory incremental job item context.
* Transmission job item context.
*/
public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {
public interface TransmissionJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {

@Override
InventoryIncrementalProcessContext getJobProcessContext();
TransmissionProcessContext getJobProcessContext();

/**
* Get inventory tasks.
Expand All @@ -52,7 +52,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
*
* @return init progress
*/
InventoryIncrementalJobItemProgress getInitProgress();
TransmissionJobItemProgress getInitProgress();

/**
* Get source meta data loader.
Expand Down Expand Up @@ -90,7 +90,7 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
long getInventoryRecordsCount();

@Override
default InventoryIncrementalJobItemProgress toProgress() {
return new InventoryIncrementalJobItemProgress(this);
default TransmissionJobItemProgress toProgress() {
return new TransmissionJobItemProgress(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;

/**
* Inventory incremental process context.
* Transmission process context.
*/
public interface InventoryIncrementalProcessContext extends PipelineProcessContext {
public interface TransmissionProcessContext extends PipelineProcessContext {

/**
* Get pipeline channel creator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
Expand All @@ -32,12 +32,12 @@
import java.util.Map;

/**
* Inventory incremental job item progress.
* Transmission job item progress.
*/
@NoArgsConstructor
@Getter
@Setter
public final class InventoryIncrementalJobItemProgress implements PipelineJobItemProgress {
public final class TransmissionJobItemProgress implements PipelineJobItemProgress {

private DatabaseType sourceDatabaseType;

Expand All @@ -55,7 +55,7 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte

private JobStatus status = JobStatus.RUNNING;

public InventoryIncrementalJobItemProgress(final InventoryIncrementalJobItemContext context) {
public TransmissionJobItemProgress(final TransmissionJobItemContext context) {
sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
dataSourceName = context.getDataSourceName();
inventory = getInventoryTasksProgress(context.getInventoryTasks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;

/**
* YAML inventory incremental job item progress.
* YAML transmission job item progress.
*/
@Getter
@Setter
public final class YamlInventoryIncrementalJobItemProgress implements YamlPipelineJobItemProgressConfiguration {
public final class YamlTransmissionJobItemProgress implements YamlPipelineJobItemProgressConfiguration {

private String status;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
package org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;

import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

/**
* YAML inventory incremental job item progress swapper.
* YAML transmission job item progress swapper.
*/
public final class YamlInventoryIncrementalJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper<YamlInventoryIncrementalJobItemProgress, InventoryIncrementalJobItemProgress> {
public final class YamlTransmissionJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper<YamlTransmissionJobItemProgress, TransmissionJobItemProgress> {

private final YamlJobItemInventoryTasksProgressSwapper inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper();

private final YamlJobItemIncrementalTasksProgressSwapper incrementalTasksProgressSwapper = new YamlJobItemIncrementalTasksProgressSwapper();

@Override
public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) {
YamlInventoryIncrementalJobItemProgress result = new YamlInventoryIncrementalJobItemProgress();
public YamlTransmissionJobItemProgress swapToYamlConfiguration(final TransmissionJobItemProgress progress) {
YamlTransmissionJobItemProgress result = new YamlTransmissionJobItemProgress();
result.setStatus(progress.getStatus().name());
result.setSourceDatabaseType(progress.getSourceDatabaseType().getType());
result.setDataSourceName(progress.getDataSourceName());
Expand All @@ -46,8 +46,8 @@ public YamlInventoryIncrementalJobItemProgress swapToYamlConfiguration(final Inv
}

@Override
public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncrementalJobItemProgress yamlProgress) {
InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress();
public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()));
result.setDataSourceName(yamlProgress.getDataSourceName());
Expand All @@ -59,7 +59,7 @@ public InventoryIncrementalJobItemProgress swapToObject(final YamlInventoryIncre
}

@Override
public Class<YamlInventoryIncrementalJobItemProgress> getYamlProgressClass() {
return YamlInventoryIncrementalJobItemProgress.class;
public Class<YamlTransmissionJobItemProgress> getYamlProgressClass() {
return YamlTransmissionJobItemProgress.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;

/**
* Inventory incremental job item info.
* Transmission job item info.
*/
@RequiredArgsConstructor
@Getter
public class InventoryIncrementalJobItemInfo {
public class TransmissionJobItemInfo {

private final int shardingItem;

private final String tableNames;

private final InventoryIncrementalJobItemProgress jobItemProgress;
private final TransmissionJobItemProgress jobItemProgress;

private final long startTimeMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;

import java.util.Collection;
Expand Down Expand Up @@ -54,15 +54,15 @@ public static boolean isAllInventoryTasksFinished(final Collection<PipelineTask>
* @param jobItemProgresses job item progresses
* @return finished or not
*/
public static boolean isInventoryFinished(final int jobShardingCount, final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
public static boolean isInventoryFinished(final int jobShardingCount, final Collection<TransmissionJobItemProgress> jobItemProgresses) {
return isAllProgressesFilled(jobShardingCount, jobItemProgresses) && isAllInventoryTasksCompleted(jobItemProgresses);
}

private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<TransmissionJobItemProgress> jobItemProgresses) {
return jobShardingCount == jobItemProgresses.size() && jobItemProgresses.stream().allMatch(Objects::nonNull);
}

private static boolean isAllInventoryTasksCompleted(final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
private static boolean isAllInventoryTasksCompleted(final Collection<TransmissionJobItemProgress> jobItemProgresses) {
if (jobItemProgresses.stream().allMatch(each -> each.getInventory().getProgresses().isEmpty())) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ public void drop(final String jobId) {
* @return jobs info
*/
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
if (jobAPI instanceof InventoryIncrementalJobAPI) {
if (jobAPI instanceof TransmissionJobAPI) {
return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
.filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
.map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
.map(each -> ((TransmissionJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
Expand All @@ -31,14 +31,14 @@
import java.sql.SQLException;

/**
* Inventory incremental job API.
* Transmission job API.
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
public interface TransmissionJobAPI extends PipelineJobAPI {

@SuppressWarnings("unchecked")
@Override
default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlInventoryIncrementalJobItemProgressSwapper();
default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlTransmissionJobItemProgressSwapper();
}

/**
Expand All @@ -65,7 +65,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
* Extend YAML job configuration.
Expand All @@ -83,7 +83,7 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext,
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);

/**
Expand Down
Loading