Skip to content

Commit

Permalink
Refactor package of pipeline.channel (#29530)
Browse files Browse the repository at this point in the history
* Refactor PipelineChannelCreator

* Add ack package

* Rename PipelineChannelAckCallback

* Rename PipelineChannelAckCallback

* Remove EmptyPipelineChannelAckCallback

* Rename PipelineChannelAckCallbackUtils

* Remove PipelineChannelAckCallbackUtils

* Add IncrementalTaskAckCallback

* Add InventoryTaskAckCallback

* Refactor package of pipeline.channel

* Refactor package of pipeline.channel

* Refactor MultiplexMemoryPipelineChannel
  • Loading branch information
terrymanu authored Dec 24, 2023
1 parent a2cf783 commit a36b614
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.List;

/**
* Record acknowledged callback.
* Pipeline channel acknowledged callback.
*/
public interface AckCallback {
public interface PipelineChannelAckCallback {

/**
* Call after record acknowledged.
* Call after records acknowledged.
*
* @param records acknowledged record list
* @param records acknowledged records
*/
void onAck(List<Record> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
public interface PipelineChannelCreator extends TypedSPI {

/**
* Create pipeline channel.
* Create new instance of pipeline channel.
*
* @param outputConcurrency output concurrency
* @param averageElementSize average element size, affect the size of the queue
* @param ackCallback ack callback
* @return {@link PipelineChannel}
* @return created instance
*/
PipelineChannel createPipelineChannel(int outputConcurrency, int averageElementSize, AckCallback ackCallback);
PipelineChannel newInstance(int outputConcurrency, int averageElementSize, PipelineChannelAckCallback ackCallback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;

import java.util.Properties;

/**
* Memory implementation of pipeline channel creator.
* Pipeline channel creator of memory.
*/
public final class MemoryPipelineChannelCreator implements PipelineChannelCreator {

Expand All @@ -40,7 +40,7 @@ public void init(final Properties props) {
}

@Override
public PipelineChannel createPipelineChannel(final int outputConcurrency, final int averageElementSize, final AckCallback ackCallback) {
public PipelineChannel newInstance(final int outputConcurrency, final int averageElementSize, final PipelineChannelAckCallback ackCallback) {
return 1 == outputConcurrency
? new SimpleMemoryPipelineChannel(blockQueueSize / averageElementSize, ackCallback)
: new MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize, ackCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.channel.memory;

import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
Expand All @@ -38,43 +38,43 @@
*/
public final class MultiplexMemoryPipelineChannel implements PipelineChannel {

private final int channelNumber;
private final int channelCount;

private final List<PipelineChannel> channels;

private final Map<String, Integer> channelAssignment = new HashMap<>();

public MultiplexMemoryPipelineChannel(final int channelNumber, final int blockQueueSize, final AckCallback ackCallback) {
this.channelNumber = channelNumber;
public MultiplexMemoryPipelineChannel(final int channelCount, final int blockQueueSize, final PipelineChannelAckCallback ackCallback) {
this.channelCount = channelCount;
int handledQueueSize = blockQueueSize < 1 ? 5 : blockQueueSize;
channels = IntStream.range(0, channelNumber).mapToObj(each -> new SimpleMemoryPipelineChannel(handledQueueSize, ackCallback)).collect(Collectors.toList());
channels = IntStream.range(0, channelCount).mapToObj(each -> new SimpleMemoryPipelineChannel(handledQueueSize, ackCallback)).collect(Collectors.toList());
}

@Override
public void push(final List<Record> records) {
Record firstRecord = records.get(0);
if (1 == records.size()) {
pushRecord(firstRecord);
push(firstRecord);
return;
}
long insertDataRecordsCount = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count();
if (insertDataRecordsCount == records.size()) {
channels.get(Math.abs(firstRecord.hashCode() % channelNumber)).push(records);
channels.get(Math.abs(firstRecord.hashCode() % channelCount)).push(records);
return;
}
for (Record each : records) {
pushRecord(each);
push(each);
}
}

private void pushRecord(final Record ingestedRecord) {
private void push(final Record ingestedRecord) {
List<Record> records = Collections.singletonList(ingestedRecord);
if (ingestedRecord instanceof FinishedRecord) {
for (int i = 0; i < channelNumber; i++) {
for (int i = 0; i < channelCount; i++) {
channels.get(i).push(records);
}
} else if (DataRecord.class.equals(ingestedRecord.getClass())) {
channels.get(Math.abs(ingestedRecord.hashCode() % channelNumber)).push(records);
channels.get(Math.abs(ingestedRecord.hashCode() % channelCount)).push(records);
} else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) {
channels.get(0).push(records);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.channel.memory;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;

Expand All @@ -37,9 +37,9 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel {

private final BlockingQueue<List<Record>> queue;

private final AckCallback ackCallback;
private final PipelineChannelAckCallback ackCallback;

public SimpleMemoryPipelineChannel(final int blockQueueSize, final AckCallback ackCallback) {
public SimpleMemoryPipelineChannel(final int blockQueueSize, final PipelineChannelAckCallback ackCallback) {
queue = blockQueueSize < 1 ? new SynchronousQueue<>(true) : new ArrayBlockingQueue<>(blockQueueSize, true);
this.ackCallback = ackCallback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,26 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;
package org.apache.shardingsphere.data.pipeline.core.task;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* Ack callback utilities.
* Incremental task acknowledged callback.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AckCallbacks {
@RequiredArgsConstructor
public final class IncrementalTaskAckCallback implements PipelineChannelAckCallback {

/**
* Ack callback for inventory dump.
*
* @param records record list
* @param position ingest position
*/
public static void inventoryCallback(final List<Record> records, final AtomicReference<IngestPosition> position) {
Record lastRecord = records.get(records.size() - 1);
position.set(lastRecord.getPosition());
}
private final IncrementalTaskProgress progress;

/**
* Ack callback for incremental dump.
*
* @param records record list
* @param progress incremental task progress
*/
public static void incrementalCallback(final List<Record> records, final IncrementalTaskProgress progress) {
@Override
public void onAck(final List<Record> records) {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof IngestPlaceholderPosition)) {
progress.setPosition(lastHandledRecord.getPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;
package org.apache.shardingsphere.data.pipeline.core.task;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* Empty implementation of record acknowledged callback.
* Inventory task acknowledged callback.
*/
public final class EmptyAckCallback implements AckCallback {
@RequiredArgsConstructor
public final class InventoryTaskAckCallback implements PipelineChannelAckCallback {

private final AtomicReference<IngestPosition> position;

@Override
public void onAck(final List<Record> records) {
position.set(records.get(records.size() - 1).getPosition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.channel.AckCallbacks;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;

Expand Down Expand Up @@ -72,7 +71,7 @@ public static IncrementalTaskProgress createIncrementalTaskProgress(final Ingest
* @return channel
*/
public static PipelineChannel createInventoryChannel(final PipelineChannelCreator pipelineChannelCreator, final int averageElementSize, final AtomicReference<IngestPosition> position) {
return pipelineChannelCreator.createPipelineChannel(1, averageElementSize, records -> AckCallbacks.inventoryCallback(records, position));
return pipelineChannelCreator.newInstance(1, averageElementSize, new InventoryTaskAckCallback(position));
}

/**
Expand All @@ -84,6 +83,6 @@ public static PipelineChannel createInventoryChannel(final PipelineChannelCreato
* @return channel
*/
public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> AckCallbacks.incrementalCallback(records, progress));
return pipelineChannelCreator.newInstance(concurrency, 5, new IncrementalTaskAckCallback(progress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.channel.memory;

import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
Expand Down Expand Up @@ -46,11 +46,11 @@ void assertInitWithoutBlockQueueSize() throws Exception {

@Test
void assertCreateSimpleMemoryPipelineChannel() {
assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").createPipelineChannel(1, 1, mock(AckCallback.class)), instanceOf(SimpleMemoryPipelineChannel.class));
assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").newInstance(1, 1, mock(PipelineChannelAckCallback.class)), instanceOf(SimpleMemoryPipelineChannel.class));
}

@Test
void assertCreateMultiplexMemoryPipelineChannel() {
assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").createPipelineChannel(2, 1, mock(AckCallback.class)), instanceOf(MultiplexMemoryPipelineChannel.class));
assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, "MEMORY").newInstance(2, 1, mock(PipelineChannelAckCallback.class)), instanceOf(MultiplexMemoryPipelineChannel.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelAckCallback;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
Expand Down Expand Up @@ -67,7 +67,7 @@ void assertBroadcastFinishedRecord() {
}

@SneakyThrows(InterruptedException.class)
private void execute(final AckCallback ackCallback, final int recordCount, final Record... records) {
private void execute(final PipelineChannelAckCallback ackCallback, final int recordCount, final Record... records) {
CountDownLatch countDownLatch = new CountDownLatch(recordCount);
MultiplexMemoryPipelineChannel memoryChannel = new MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
fetchWithMultiThreads(memoryChannel, countDownLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.channel.memory;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
Expand All @@ -37,7 +36,9 @@ class SimpleMemoryPipelineChannelTest {
@SneakyThrows(InterruptedException.class)
@Test
void assertZeroQueueSizeWorks() {
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(0, new EmptyAckCallback());
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(0, records -> {

});
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
Thread thread = new Thread(() -> channel.push(records));
thread.start();
Expand All @@ -47,7 +48,9 @@ void assertZeroQueueSizeWorks() {

@Test
void assertFetchRecordsTimeoutCorrectly() {
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10, records -> {

});
long startMillis = System.currentTimeMillis();
channel.fetch(1, 1, TimeUnit.MILLISECONDS);
long delta = System.currentTimeMillis() - startMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
Expand Down Expand Up @@ -88,7 +87,9 @@ void setUp() throws SQLException {
IncrementalDumperContext dumperContext = createDumperContext();
initTableData(dumperContext);
PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class);
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, records -> {

});
incrementalDumper = new MySQLIncrementalDumper(dumperContext, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList());
when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.core.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
Expand Down Expand Up @@ -83,7 +82,9 @@ class PostgreSQLWALDumperTest {
@BeforeEach
void setUp() {
position = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
channel = new SimpleMemoryPipelineChannel(10000, records -> {

});
String jdbcUrl = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL";
String username = "root";
String password = "root";
Expand Down

0 comments on commit a36b614

Please sign in to comment.