Skip to content

Commit

Permalink
Revert "Refactor TransmissionJobItemProgress (apache#32736)" (apache#…
Browse files Browse the repository at this point in the history
…32737)

This reverts commit e98467e.
  • Loading branch information
terrymanu authored Aug 30, 2024
1 parent e98467e commit 90b1f6b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private boolean execute(final I jobItemContext) {

protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);

private void prepare(final I jobItemContext) {
protected final void prepare(final I jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.Collection;
Expand All @@ -34,22 +34,22 @@
/**
* Transmission job item progress.
*/
@RequiredArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public final class TransmissionJobItemProgress implements PipelineJobItemProgress {

private final DatabaseType sourceDatabaseType;
private DatabaseType sourceDatabaseType;

private final String dataSourceName;
private String dataSourceName;

private final JobItemInventoryTasksProgress inventory;
private JobItemInventoryTasksProgress inventory;

private final JobItemIncrementalTasksProgress incremental;
private JobItemIncrementalTasksProgress incremental;

private final long inventoryRecordsCount;
private long inventoryRecordsCount;

private final long processedRecordsCount;
private long processedRecordsCount;

private boolean active;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public YamlTransmissionJobItemProgress swapToYamlConfiguration(final Transmissio

@Override
public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) {
TransmissionJobItemProgress result = new TransmissionJobItemProgress(
TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()),
yamlProgress.getDataSourceName(), inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()),
incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()),
yamlProgress.getInventoryRecordsCount(), yamlProgress.getProcessedRecordsCount());
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()));
result.setDataSourceName(yamlProgress.getDataSourceName());
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()));
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PipelineJobProgressDetectorTest {

Expand Down Expand Up @@ -68,30 +67,31 @@ void assertIsInventoryFinishedWhenCollectionElementIsNull() {

@Test
void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() {
assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(mock(TransmissionJobItemProgress.class))));
TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress();
assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(transmissionJobItemProgress)));
}

@Test
void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() {
JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.emptyMap());
TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class);
when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress);
TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress();
transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress)));
}

@Test
void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() {
JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestPlaceholderPosition())));
TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class);
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress();
transmissionJobItemProgress.setInventory(inventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress)));
}

@Test
void assertIsInventoryFinished() {
JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestFinishedPosition())));
TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class);
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress();
transmissionJobItemProgress.setInventory(inventoryTasksProgress);
assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,13 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo

private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
return new TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), incrementalDumperContext.getCommonContext().getDataSourceName(), null,
new JobItemIncrementalTasksProgress(incrementalTaskProgress), 0L, 0L);
result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}

/**
Expand Down

0 comments on commit 90b1f6b

Please sign in to comment.