From a36b614d326d924a5bb33e8462b2d83fedac37d5 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 24 Dec 2023 23:50:52 +0800 Subject: [PATCH] Refactor package of pipeline.channel (#29530) * Refactor PipelineChannelCreator * Add ack package * Rename PipelineChannelAckCallback * Rename PipelineChannelAckCallback * Remove EmptyPipelineChannelAckCallback * Rename PipelineChannelAckCallbackUtils * Remove PipelineChannelAckCallbackUtils * Add IncrementalTaskAckCallback * Add InventoryTaskAckCallback * Refactor package of pipeline.channel * Refactor package of pipeline.channel * Refactor MultiplexMemoryPipelineChannel --- ...k.java => PipelineChannelAckCallback.java} | 8 ++--- .../core/channel/PipelineChannelCreator.java | 6 ++-- .../memory/MemoryPipelineChannelCreator.java | 6 ++-- .../MultiplexMemoryPipelineChannel.java | 22 ++++++------ .../memory/SimpleMemoryPipelineChannel.java | 6 ++-- .../IncrementalTaskAckCallback.java} | 34 +++++-------------- .../InventoryTaskAckCallback.java} | 14 ++++++-- .../pipeline/core/task/PipelineTaskUtils.java | 9 +++-- .../MemoryPipelineChannelCreatorTest.java | 6 ++-- .../MultiplexMemoryPipelineChannelTest.java | 4 +-- .../SimpleMemoryPipelineChannelTest.java | 9 +++-- .../ingest/MySQLIncrementalDumperTest.java | 7 ++-- .../ingest/PostgreSQLWALDumperTest.java | 7 ++-- 13 files changed, 67 insertions(+), 71 deletions(-) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/{AckCallback.java => PipelineChannelAckCallback.java} (85%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/{channel/AckCallbacks.java => task/IncrementalTaskAckCallback.java} (61%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/{channel/EmptyAckCallback.java => task/InventoryTaskAckCallback.java} (62%) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallback.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java similarity index 85% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallback.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java index 0f96d5d71178f..88f7b9d7bb0c2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallback.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelAckCallback.java @@ -22,14 +22,14 @@ import java.util.List; /** - * Record acknowledged callback. + * Pipeline channel acknowledged callback. */ -public interface AckCallback { +public interface PipelineChannelAckCallback { /** - * Call after record acknowledged. + * Call after records acknowledged. * - * @param records acknowledged record list + * @param records acknowledged records */ void onAck(List records); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelCreator.java index e890c6dad0d87..09c3d12aeb52b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/PipelineChannelCreator.java @@ -25,12 +25,12 @@ public interface PipelineChannelCreator extends TypedSPI { /** - * Create pipeline channel. + * Create new instance of pipeline channel. * * @param outputConcurrency output concurrency * @param averageElementSize average element size, affect the size of the queue * @param ackCallback ack callback - * @return {@link PipelineChannel} + * @return created instance */ - PipelineChannel createPipelineChannel(int outputConcurrency, int averageElementSize, AckCallback ackCallback); + PipelineChannel newInstance(int outputConcurrency, int averageElementSize, PipelineChannelAckCallback ackCallback); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java index f4f782537eda8..1ed97fbc18732 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreator.java @@ -19,12 +19,12 @@ import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import java.util.Properties; /** - * Memory implementation of pipeline channel creator. + * Pipeline channel creator of memory. */ public final class MemoryPipelineChannelCreator implements PipelineChannelCreator { @@ -40,7 +40,7 @@ public void init(final Properties props) { } @Override - public PipelineChannel createPipelineChannel(final int outputConcurrency, final int averageElementSize, final AckCallback ackCallback) { + public PipelineChannel newInstance(final int outputConcurrency, final int averageElementSize, final PipelineChannelAckCallback ackCallback) { return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel(blockQueueSize / averageElementSize, ackCallback) : new MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize, ackCallback); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannel.java index 68dc02a87f0fd..e664dde56f8e1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannel.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannel.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.channel.memory; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; @@ -38,43 +38,43 @@ */ public final class MultiplexMemoryPipelineChannel implements PipelineChannel { - private final int channelNumber; + private final int channelCount; private final List channels; private final Map channelAssignment = new HashMap<>(); - public MultiplexMemoryPipelineChannel(final int channelNumber, final int blockQueueSize, final AckCallback ackCallback) { - this.channelNumber = channelNumber; + public MultiplexMemoryPipelineChannel(final int channelCount, final int blockQueueSize, final PipelineChannelAckCallback ackCallback) { + this.channelCount = channelCount; int handledQueueSize = blockQueueSize < 1 ? 5 : blockQueueSize; - channels = IntStream.range(0, channelNumber).mapToObj(each -> new SimpleMemoryPipelineChannel(handledQueueSize, ackCallback)).collect(Collectors.toList()); + channels = IntStream.range(0, channelCount).mapToObj(each -> new SimpleMemoryPipelineChannel(handledQueueSize, ackCallback)).collect(Collectors.toList()); } @Override public void push(final List records) { Record firstRecord = records.get(0); if (1 == records.size()) { - pushRecord(firstRecord); + push(firstRecord); return; } long insertDataRecordsCount = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count(); if (insertDataRecordsCount == records.size()) { - channels.get(Math.abs(firstRecord.hashCode() % channelNumber)).push(records); + channels.get(Math.abs(firstRecord.hashCode() % channelCount)).push(records); return; } for (Record each : records) { - pushRecord(each); + push(each); } } - private void pushRecord(final Record ingestedRecord) { + private void push(final Record ingestedRecord) { List records = Collections.singletonList(ingestedRecord); if (ingestedRecord instanceof FinishedRecord) { - for (int i = 0; i < channelNumber; i++) { + for (int i = 0; i < channelCount; i++) { channels.get(i).push(records); } } else if (DataRecord.class.equals(ingestedRecord.getClass())) { - channels.get(Math.abs(ingestedRecord.hashCode() % channelNumber)).push(records); + channels.get(Math.abs(ingestedRecord.hashCode() % channelCount)).push(records); } else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) { channels.get(0).push(records); } else { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannel.java index 8eea7a8016ee6..731f1be8b9d2b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannel.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannel.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.channel.memory; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; @@ -37,9 +37,9 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel { private final BlockingQueue> queue; - private final AckCallback ackCallback; + private final PipelineChannelAckCallback ackCallback; - public SimpleMemoryPipelineChannel(final int blockQueueSize, final AckCallback ackCallback) { + public SimpleMemoryPipelineChannel(final int blockQueueSize, final PipelineChannelAckCallback ackCallback) { queue = blockQueueSize < 1 ? new SynchronousQueue<>(true) : new ArrayBlockingQueue<>(blockQueueSize, true); this.ackCallback = ackCallback; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallbacks.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskAckCallback.java similarity index 61% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallbacks.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskAckCallback.java index 28578bc45a0f9..abcd8e5d360e0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/AckCallbacks.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskAckCallback.java @@ -15,42 +15,26 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.channel; +package org.apache.shardingsphere.data.pipeline.core.task; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** - * Ack callback utilities. + * Incremental task acknowledged callback. */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class AckCallbacks { +@RequiredArgsConstructor +public final class IncrementalTaskAckCallback implements PipelineChannelAckCallback { - /** - * Ack callback for inventory dump. - * - * @param records record list - * @param position ingest position - */ - public static void inventoryCallback(final List records, final AtomicReference position) { - Record lastRecord = records.get(records.size() - 1); - position.set(lastRecord.getPosition()); - } + private final IncrementalTaskProgress progress; - /** - * Ack callback for incremental dump. - * - * @param records record list - * @param progress incremental task progress - */ - public static void incrementalCallback(final List records, final IncrementalTaskProgress progress) { + @Override + public void onAck(final List records) { Record lastHandledRecord = records.get(records.size() - 1); if (!(lastHandledRecord.getPosition() instanceof IngestPlaceholderPosition)) { progress.setPosition(lastHandledRecord.getPosition()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/EmptyAckCallback.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskAckCallback.java similarity index 62% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/EmptyAckCallback.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskAckCallback.java index 8e219cae8694a..fd1202b1896b1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/EmptyAckCallback.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskAckCallback.java @@ -15,18 +15,26 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.channel; +package org.apache.shardingsphere.data.pipeline.core.task; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; /** - * Empty implementation of record acknowledged callback. + * Inventory task acknowledged callback. */ -public final class EmptyAckCallback implements AckCallback { +@RequiredArgsConstructor +public final class InventoryTaskAckCallback implements PipelineChannelAckCallback { + + private final AtomicReference position; @Override public void onAck(final List records) { + position.set(records.get(records.size() - 1).getPosition()); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index 99fdc43d4aa44..6c6f222f47257 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -19,11 +19,10 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallbacks; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; @@ -72,7 +71,7 @@ public static IncrementalTaskProgress createIncrementalTaskProgress(final Ingest * @return channel */ public static PipelineChannel createInventoryChannel(final PipelineChannelCreator pipelineChannelCreator, final int averageElementSize, final AtomicReference position) { - return pipelineChannelCreator.createPipelineChannel(1, averageElementSize, records -> AckCallbacks.inventoryCallback(records, position)); + return pipelineChannelCreator.newInstance(1, averageElementSize, new InventoryTaskAckCallback(position)); } /** @@ -84,6 +83,6 @@ public static PipelineChannel createInventoryChannel(final PipelineChannelCreato * @return channel */ public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) { - return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> AckCallbacks.incrementalCallback(records, progress)); + return pipelineChannelCreator.newInstance(concurrency, 5, new IncrementalTaskAckCallback(progress)); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java index 3f3b70fee91cd..2146b4c1ce2fc 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelCreatorTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.channel.memory; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.test.util.PropertiesBuilder; @@ -46,11 +46,11 @@ void assertInitWithoutBlockQueueSize() throws Exception { @Test void assertCreateSimpleMemoryPipelineChannel() { - assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").createPipelineChannel(1, 1, mock(AckCallback.class)), instanceOf(SimpleMemoryPipelineChannel.class)); + assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").newInstance(1, 1, mock(PipelineChannelAckCallback.class)), instanceOf(SimpleMemoryPipelineChannel.class)); } @Test void assertCreateMultiplexMemoryPipelineChannel() { - assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").createPipelineChannel(2, 1, mock(AckCallback.class)), instanceOf(MultiplexMemoryPipelineChannel.class)); + assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").newInstance(2, 1, mock(PipelineChannelAckCallback.class)), instanceOf(MultiplexMemoryPipelineChannel.class)); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java index 96fa8ffc3b89d..a656608490a6f 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MultiplexMemoryPipelineChannelTest.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; @@ -67,7 +67,7 @@ void assertBroadcastFinishedRecord() { } @SneakyThrows(InterruptedException.class) - private void execute(final AckCallback ackCallback, final int recordCount, final Record... records) { + private void execute(final PipelineChannelAckCallback ackCallback, final int recordCount, final Record... records) { CountDownLatch countDownLatch = new CountDownLatch(recordCount); MultiplexMemoryPipelineChannel memoryChannel = new MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback); fetchWithMultiThreads(memoryChannel, countDownLatch); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannelTest.java index 6ffb309f492b6..75e9436c4baf6 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/SimpleMemoryPipelineChannelTest.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.core.channel.memory; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; @@ -37,7 +36,9 @@ class SimpleMemoryPipelineChannelTest { @SneakyThrows(InterruptedException.class) @Test void assertZeroQueueSizeWorks() { - SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(0, new EmptyAckCallback()); + SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(0, records -> { + + }); List records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); Thread thread = new Thread(() -> channel.push(records)); thread.start(); @@ -47,7 +48,9 @@ void assertZeroQueueSizeWorks() { @Test void assertFetchRecordsTimeoutCorrectly() { - SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback()); + SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10, records -> { + + }); long startMillis = System.currentTimeMillis(); channel.fetch(1, 1, TimeUnit.MILLISECONDS); long delta = System.currentTimeMillis() - startMillis; diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 312e2ea41a127..a629d9d9e1fc5 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -18,11 +18,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback; -import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; @@ -88,7 +87,9 @@ void setUp() throws SQLException { IncrementalDumperContext dumperContext = createDumperContext(); initTableData(dumperContext); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); - SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback()); + SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, records -> { + + }); incrementalDumper = new MySQLIncrementalDumper(dumperContext, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader); pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 02996fdb8f2e8..02d430f8eb3c9 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -18,10 +18,9 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; -import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback; -import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; @@ -83,7 +82,9 @@ class PostgreSQLWALDumperTest { @BeforeEach void setUp() { position = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); - channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback()); + channel = new SimpleMemoryPipelineChannel(10000, records -> { + + }); String jdbcUrl = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL"; String username = "root"; String password = "root";