From 90b1f6bf3dbf5fd28ab4f27945c3820c60454fdd Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Fri, 30 Aug 2024 19:46:24 +0800 Subject: [PATCH] Revert "Refactor TransmissionJobItemProgress (#32736)" (#32737) This reverts commit e98467e43f52a80593d0a167523843298d88bd22. --- .../core/job/AbstractSeparablePipelineJob.java | 2 +- .../progress/TransmissionJobItemProgress.java | 18 +++++++++--------- ...YamlTransmissionJobItemProgressSwapper.java | 12 +++++++----- .../PipelineJobProgressDetectorTest.java | 16 ++++++++-------- .../data/pipeline/cdc/api/CDCJobAPI.java | 7 +++++-- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 648665e70b7fb..196034b3549bb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -124,7 +124,7 @@ private boolean execute(final I jobItemContext) { protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext); - private void prepare(final I jobItemContext) { + protected final void prepare(final I jobItemContext) { try { doPrepare(jobItemContext); // CHECKSTYLE:OFF diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java index 88876f0763ae6..56c18fa144c58 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java @@ -18,13 +18,13 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress; import lombok.Getter; -import lombok.RequiredArgsConstructor; +import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress; +import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Collection; @@ -34,22 +34,22 @@ /** * Transmission job item progress. */ -@RequiredArgsConstructor +@NoArgsConstructor @Getter @Setter public final class TransmissionJobItemProgress implements PipelineJobItemProgress { - private final DatabaseType sourceDatabaseType; + private DatabaseType sourceDatabaseType; - private final String dataSourceName; + private String dataSourceName; - private final JobItemInventoryTasksProgress inventory; + private JobItemInventoryTasksProgress inventory; - private final JobItemIncrementalTasksProgress incremental; + private JobItemIncrementalTasksProgress incremental; - private final long inventoryRecordsCount; + private long inventoryRecordsCount; - private final long processedRecordsCount; + private long processedRecordsCount; private boolean active; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java index 4082d0dc48796..43e63745c3aa8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java @@ -47,12 +47,14 @@ public YamlTransmissionJobItemProgress swapToYamlConfiguration(final Transmissio @Override public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) { - TransmissionJobItemProgress result = new TransmissionJobItemProgress( - TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()), - yamlProgress.getDataSourceName(), inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()), - incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()), - yamlProgress.getInventoryRecordsCount(), yamlProgress.getProcessedRecordsCount()); + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setStatus(JobStatus.valueOf(yamlProgress.getStatus())); + result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType())); + result.setDataSourceName(yamlProgress.getDataSourceName()); + result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory())); + result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental())); + result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount()); + result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount()); return result; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java index 850cf0d1991f4..9a3bfdd59711b 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java @@ -34,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; class PipelineJobProgressDetectorTest { @@ -68,30 +67,31 @@ void assertIsInventoryFinishedWhenCollectionElementIsNull() { @Test void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() { - assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(mock(TransmissionJobItemProgress.class)))); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() { JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.emptyMap()); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestPlaceholderPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(inventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinished() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestFinishedPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(inventoryTasksProgress); assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index b8d5447ce3fed..2e9d6187cf89d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -207,10 +207,13 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager, final IncrementalDumperContext incrementalDumperContext) throws SQLException { + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); + result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); + result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager)); - return new TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), incrementalDumperContext.getCommonContext().getDataSourceName(), null, - new JobItemIncrementalTasksProgress(incrementalTaskProgress), 0L, 0L); + result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); + return result; } /**