From 152f56c1195f324d310e794c443bec2b3be7190d Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Mon, 6 Nov 2023 15:07:11 +0800 Subject: [PATCH 1/6] Improve CDCImporter finished flag --- .../data/pipeline/cdc/core/importer/CDCImporter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index bef90f99488e6..2a34cce00a9ea 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; @@ -34,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId; import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; @@ -86,7 +86,7 @@ protected void runBlocking() { } else { doWithoutSorting(channelProgressPairs); } - if (channelProgressPairs.isEmpty()) { + if (channelProgressPairs.isEmpty() && ackCache.estimatedSize() == 0) { break; } } @@ -239,6 +239,7 @@ public void ack(final String ackId) { each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord())); each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount())); } + ackCache.invalidate(ackId); } @Override From f39232e672b052aa13ee6d77d1b7bd6393f15975 Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Tue, 7 Nov 2023 14:23:11 +0800 Subject: [PATCH 2/6] Improve CDCImporter process FinishedRecord --- .../cdc/core/importer/CDCImporter.java | 23 ++++++++----------- .../cdc/core/prepare/CDCJobPreparer.java | 3 ++- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index ff107c500ec7a..0dca1d649a806 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; @@ -34,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId; import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; @@ -41,7 +41,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; @@ -79,23 +78,20 @@ public final class CDCImporter extends AbstractPipelineLifecycleRunnable impleme @Override protected void runBlocking() { CDCImporterManager.putImporter(this); - List channelProgressPairs = new ArrayList<>(originalChannelProgressPairs); while (isRunning()) { if (needSorting) { - doWithSorting(channelProgressPairs); + doWithSorting(originalChannelProgressPairs); } else { - doWithoutSorting(channelProgressPairs); + doWithoutSorting(originalChannelProgressPairs); } - if (channelProgressPairs.isEmpty() && ackCache.estimatedSize() == 0) { + if (originalChannelProgressPairs.isEmpty()) { break; } } } private void doWithoutSorting(final List channelProgressPairs) { - Iterator channelProgressPairsIterator = channelProgressPairs.iterator(); - while (channelProgressPairsIterator.hasNext()) { - CDCChannelProgressPair channelProgressPair = channelProgressPairsIterator.next(); + for (final CDCChannelProgressPair channelProgressPair : channelProgressPairs) { PipelineChannel channel = channelProgressPair.getChannel(); List records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList()); if (records.isEmpty()) { @@ -108,9 +104,6 @@ private void doWithoutSorting(final List channelProgress ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))))); sink.write(ackId, records); Record lastRecord = records.get(records.size() - 1); - if (lastRecord instanceof FinishedRecord) { - channelProgressPairsIterator.remove(); - } if (lastRecord instanceof FinishedRecord && records.stream().noneMatch(DataRecord.class::isInstance)) { channel.ack(records); channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); @@ -236,7 +229,11 @@ public void ack(final String ackId) { } for (Pair each : channelPositionPairList) { CDCAckPosition ackPosition = each.getRight(); - each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord())); + Record lastRecord = ackPosition.getLastRecord(); + each.getLeft().getChannel().ack(Collections.singletonList(lastRecord)); + if (lastRecord instanceof FinishedRecord) { + originalChannelProgressPairs.remove(each.getKey()); + } each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount())); } ackCache.invalidate(ackId); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 3d0f982de5981..3130416367a30 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -55,6 +55,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -75,7 +76,7 @@ public final class CDCJobPreparer { public void initTasks(final Collection jobItemContexts) { // TODO Use pipeline tree to build it AtomicBoolean inventoryImporterUsed = new AtomicBoolean(); - List inventoryChannelProgressPairs = new LinkedList<>(); + List inventoryChannelProgressPairs = new CopyOnWriteArrayList<>(); AtomicBoolean incrementalImporterUsed = new AtomicBoolean(); List incrementalChannelProgressPairs = new LinkedList<>(); for (CDCJobItemContext each : jobItemContexts) { From cff349c85bf8e0055e7bdb9680ef91a8acf77a2d Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Tue, 7 Nov 2023 14:23:20 +0800 Subject: [PATCH 3/6] Add RetryStreamingExceptionHandler --- .../data/pipeline/cdc/client/CDCClient.java | 16 +---- .../cdc/client/handler/CDCRequestHandler.java | 4 +- .../cdc/client/handler/ExceptionHandler.java | 16 ++++- .../handler/LoggerExceptionHandler.java | 7 +- .../RetryStreamingExceptionHandler.java | 71 +++++++++++++++++++ .../e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 4 +- 6 files changed, 96 insertions(+), 22 deletions(-) create mode 100644 kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java index 82c3adc2eb030..bd362ee2cd7bf 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java @@ -65,6 +65,9 @@ public final class CDCClient implements AutoCloseable { public CDCClient(final CDCClientConfiguration config) { validateParameter(config); this.config = config; + if (null != config.getExceptionHandler()) { + config.getExceptionHandler().setCDCClient(this); + } } private void validateParameter(final CDCClientConfiguration parameter) { @@ -165,9 +168,6 @@ public String startStreaming(final StartStreamingParameter parameter) { * @param streamingId streaming id */ public void restartStreaming(final String streamingId) { - if (checkStreamingIdExist(streamingId)) { - stopStreaming(streamingId); - } String requestId = RequestIdUtils.generateRequestId(); StartStreamingRequestBody body = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build(); CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build(); @@ -179,16 +179,6 @@ public void restartStreaming(final String streamingId) { log.info("Restart streaming success, streaming id: {}", streamingId); } - private boolean checkStreamingIdExist(final String streamingId) { - checkChannelActive(); - ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (null == connectionContext) { - log.warn("The connection context not exist"); - return true; - } - return connectionContext.getStreamingIds().contains(streamingId); - } - /** * Stop streaming. * diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java index b991ef1a4481a..621f91af5b4b4 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java @@ -78,7 +78,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { future.countDown(); requestType = future.getRequestType(); } - exceptionHandler.handleServerException(new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); + exceptionHandler.handleServerException(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); responseFuture.ifPresent(future -> { future.setErrorCode(response.getErrorCode()); future.setErrorMessage(response.getErrorMessage()); @@ -113,6 +113,6 @@ private void processDataRecords(final ChannelHandlerContext ctx, final DataRecor @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - exceptionHandler.handleSocketException(cause); + exceptionHandler.handleSocketException(ctx, cause); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java index 9664809f4727c..4c50ab1335323 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; /** @@ -27,14 +29,24 @@ public interface ExceptionHandler { /** * Handle server exception. * + * @param ctx channel handler context * @param result error result */ - void handleServerException(ServerErrorResult result); + void handleServerException(ChannelHandlerContext ctx, ServerErrorResult result); /** * Handle socket exception. * + * @param ctx channel handler context * @param throwable throwable */ - void handleSocketException(Throwable throwable); + void handleSocketException(ChannelHandlerContext ctx, Throwable throwable); + + /** + * Set CDC client. + * + * @param cdcClient cdc client + */ + default void setCDCClient(CDCClient cdcClient) { + } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java index cf881ff26d88d..6a8b474a90781 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; +import io.netty.channel.ChannelHandlerContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; @@ -26,15 +27,15 @@ */ @RequiredArgsConstructor @Slf4j -public class LoggerExceptionHandler implements ExceptionHandler { +public final class LoggerExceptionHandler implements ExceptionHandler { @Override - public void handleServerException(final ServerErrorResult result) { + public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); } @Override - public void handleSocketException(final Throwable throwable) { + public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { log.error("Socket error: ", throwable); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java new file mode 100644 index 0000000000000..50186c40dfa75 --- /dev/null +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.client.handler; + +import io.netty.channel.ChannelHandlerContext; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; +import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; +import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Retry streaming exception handler. + */ +@RequiredArgsConstructor +@Slf4j +public class RetryStreamingExceptionHandler implements ExceptionHandler { + + private final AtomicInteger retryTimes = new AtomicInteger(0); + + private CDCClient cdcClient; + + @Override + public void setCDCClient(final CDCClient cdcClient) { + this.cdcClient = cdcClient; + } + + @Override + public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { + // Server exception maybe not be resolved by retrying + log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); + } + + @SneakyThrows(InterruptedException.class) + @Override + public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { + if (null == cdcClient) { + log.warn("CDC client is null, could not retry"); + return; + } + ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); + if (retryTimes.get() > 5) { + log.warn("Retry times exceed 5, stop streaming"); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); + return; + } + retryTimes.incrementAndGet(); + TimeUnit.SECONDS.sleep(Math.min(5 >> retryTimes.get(), 10)); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); + } +} diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 5f586cc8d4b47..93e096de8686f 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -169,7 +169,7 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler()); + CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new RetryStreamingExceptionHandler()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient cdcClient = new CDCClient(cdcConfig); cdcClient.connect(); From 1b9618a396167ea8081b18d968343793dcb4aba4 Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Tue, 7 Nov 2023 19:59:07 +0800 Subject: [PATCH 4/6] Add reconnect case --- .../RetryStreamingExceptionHandler.java | 25 ++++++++++++----- .../cdc/client/example/Bootstrap.java | 27 +++++++++++++------ .../netty/CDCChannelInboundHandler.java | 15 +++++++---- .../e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 10 +++---- 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java index 50186c40dfa75..99e8bea16d20c 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -36,10 +36,17 @@ @Slf4j public class RetryStreamingExceptionHandler implements ExceptionHandler { - private final AtomicInteger retryTimes = new AtomicInteger(0); + private final AtomicInteger retryCount = new AtomicInteger(0); + + private final int retryIntervalMills; private CDCClient cdcClient; + public RetryStreamingExceptionHandler(final int retryCount, final int retryIntervalMills) { + this.retryCount.set(retryCount); + this.retryIntervalMills = retryIntervalMills; + } + @Override public void setCDCClient(final CDCClient cdcClient) { this.cdcClient = cdcClient; @@ -47,25 +54,31 @@ public void setCDCClient(final CDCClient cdcClient) { @Override public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { - // Server exception maybe not be resolved by retrying log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); + reconnect(ctx); } - @SneakyThrows(InterruptedException.class) @Override public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { + log.error("Socket error: {}", throwable.getMessage()); + reconnect(ctx); + } + + @SneakyThrows(InterruptedException.class) + private void reconnect(final ChannelHandlerContext ctx) { + retryCount.incrementAndGet(); if (null == cdcClient) { log.warn("CDC client is null, could not retry"); return; } ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (retryTimes.get() > 5) { + if (retryCount.get() > 5) { log.warn("Retry times exceed 5, stop streaming"); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); return; } - retryTimes.incrementAndGet(); - TimeUnit.SECONDS.sleep(Math.min(5 >> retryTimes.get(), 10)); + TimeUnit.MILLISECONDS.sleep(retryIntervalMills); + log.info("Retry to restart streaming, retry count: {}", retryCount.get()); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java index f00b12a401377..69202eb9a5065 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -41,13 +41,24 @@ public static void main(final String[] args) { // Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server // TimeZone.setDefault(TimeZone.getTimeZone("UTC")); String address = "127.0.0.1"; - CDCClientConfiguration clientConfig = new CDCClientConfiguration(address, 33071, records -> log.info("records: {}", records), new LoggerExceptionHandler()); - try (CDCClient cdcClient = new CDCClient(clientConfig)) { - cdcClient.connect(); - cdcClient.login(new CDCLoginParameter("root", "root")); - String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true)); - log.info("Streaming id={}", streamingId); - cdcClient.await(); + CDCClientConfiguration clientConfig = new CDCClientConfiguration(address, 33071, records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(3, 5000)); + int reconnectCount = 0; + int maxReconnectCount = 5; + while (reconnectCount < maxReconnectCount) { + try (CDCClient cdcClient = new CDCClient(clientConfig)) { + cdcClient.connect(); + cdcClient.login(new CDCLoginParameter("root", "root")); + String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true)); + log.info("Streaming id={}", streamingId); + cdcClient.await(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + log.error("Exception occur: {}", ex.getMessage()); + } + Thread.sleep(5000); + log.info("Reconnect count: {}", reconnectCount); + reconnectCount++; } } } diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java index f9f6063f5a8bc..88ded2b79edf1 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java @@ -44,12 +44,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException; import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion; -import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException; -import org.apache.shardingsphere.infra.metadata.user.Grantee; -import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.core.external.sql.ShardingSphereSQLException; import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; +import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException; +import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException; +import org.apache.shardingsphere.infra.metadata.user.Grantee; +import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import java.net.InetSocketAddress; @@ -172,8 +173,12 @@ private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDC throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty")); } checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase()); - CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel()); - ctx.writeAndFlush(response); + try { + CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel()); + ctx.writeAndFlush(response); + } catch (final PipelineSQLException ex) { + throw new CDCExceptionWrapper(request.getRequestId(), ex); + } } private void processAckStreamingRequest(final CDCRequest request) { diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index b0192fc9a52bf..dd2b2d8bbcce5 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -20,19 +20,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; @@ -169,7 +169,7 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new RetryStreamingExceptionHandler()); + CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient cdcClient = new CDCClient(cdcConfig); cdcClient.connect(); From 9cbbfcb7a63d0f276be60e97da7df5348e9498d2 Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Tue, 7 Nov 2023 21:28:50 +0800 Subject: [PATCH 5/6] Refactor cdc exception handler --- .../data/pipeline/cdc/client/CDCClient.java | 17 ++++++--- .../client/config/CDCClientConfiguration.java | 17 --------- .../cdc/client/handler/CDCRequestHandler.java | 6 ++-- .../cdc/client/handler/ExceptionHandler.java | 22 ++---------- ....java => LoggerExceptionErrorHandler.java} | 10 +++--- .../RetryStreamingExceptionHandler.java | 34 +++++++----------- .../handler/ServerErrorResultHandler.java | 35 +++++++++++++++++++ .../cdc/client/example/Bootstrap.java | 9 ++--- .../e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 8 ++--- 9 files changed, 79 insertions(+), 79 deletions(-) rename kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/{LoggerExceptionHandler.java => LoggerExceptionErrorHandler.java} (76%) create mode 100644 kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java index bd362ee2cd7bf..e1b31c2decc73 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java @@ -35,6 +35,8 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus; import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; import org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ServerErrorResultHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils; @@ -49,6 +51,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record; + +import java.util.List; +import java.util.function.Consumer; /** * CDC client. @@ -65,9 +71,6 @@ public final class CDCClient implements AutoCloseable { public CDCClient(final CDCClientConfiguration config) { validateParameter(config); this.config = config; - if (null != config.getExceptionHandler()) { - config.getExceptionHandler().setCDCClient(this); - } } private void validateParameter(final CDCClientConfiguration parameter) { @@ -81,9 +84,13 @@ private void validateParameter(final CDCClientConfiguration parameter) { /** * Connect. + * + * @param dataConsumer data consumer + * @param exceptionHandler exception handler + * @param errorResultHandler error result handler */ @SneakyThrows(InterruptedException.class) - public void connect() { + public void connect(final Consumer> dataConsumer, final ExceptionHandler exceptionHandler, final ServerErrorResultHandler errorResultHandler) { Bootstrap bootstrap = new Bootstrap(); group = new NioEventLoopGroup(1); bootstrap.channel(NioSocketChannel.class) @@ -98,7 +105,7 @@ protected void initChannel(final NioSocketChannel channel) { channel.pipeline().addLast(new ProtobufDecoder(CDCResponse.getDefaultInstance())); channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); channel.pipeline().addLast(new ProtobufEncoder()); - channel.pipeline().addLast(new CDCRequestHandler(config.getDataConsumer(), config.getExceptionHandler())); + channel.pipeline().addLast(new CDCRequestHandler(dataConsumer, exceptionHandler, errorResultHandler)); } }); channel = bootstrap.connect(config.getAddress(), config.getPort()).sync().channel(); diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java index 99c7de6942061..6b2b5a3900288 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java @@ -19,11 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler; -import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record; - -import java.util.List; -import java.util.function.Consumer; /** * CDC client configuration. @@ -36,17 +31,5 @@ public final class CDCClientConfiguration { private final int port; - private final Consumer> dataConsumer; - - private final ExceptionHandler exceptionHandler; - private final int timeoutMills; - - public CDCClientConfiguration(final String address, final int port, final Consumer> dataConsumer, final ExceptionHandler exceptionHandler) { - this.address = address; - this.port = port; - this.dataConsumer = dataConsumer; - this.exceptionHandler = exceptionHandler; - this.timeoutMills = 5000; - } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java index 621f91af5b4b4..630cc41916e61 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java @@ -50,6 +50,8 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter { private final ExceptionHandler exceptionHandler; + private final ServerErrorResultHandler errorResultHandler; + @Override public void channelRegistered(final ChannelHandlerContext ctx) { ClientConnectionContext context = new ClientConnectionContext(); @@ -78,7 +80,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { future.countDown(); requestType = future.getRequestType(); } - exceptionHandler.handleServerException(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); + errorResultHandler.handleServerError(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); responseFuture.ifPresent(future -> { future.setErrorCode(response.getErrorCode()); future.setErrorMessage(response.getErrorMessage()); @@ -113,6 +115,6 @@ private void processDataRecords(final ChannelHandlerContext ctx, final DataRecor @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - exceptionHandler.handleSocketException(ctx, cause); + exceptionHandler.handleException(ctx, cause); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java index 4c50ab1335323..3867c8b162362 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java @@ -18,8 +18,6 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; import io.netty.channel.ChannelHandlerContext; -import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; -import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; /** * Exception handler. @@ -27,26 +25,10 @@ public interface ExceptionHandler { /** - * Handle server exception. - * - * @param ctx channel handler context - * @param result error result - */ - void handleServerException(ChannelHandlerContext ctx, ServerErrorResult result); - - /** - * Handle socket exception. + * Handle exception. * * @param ctx channel handler context * @param throwable throwable */ - void handleSocketException(ChannelHandlerContext ctx, Throwable throwable); - - /** - * Set CDC client. - * - * @param cdcClient cdc client - */ - default void setCDCClient(CDCClient cdcClient) { - } + void handleException(ChannelHandlerContext ctx, Throwable throwable); } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java similarity index 76% rename from kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java rename to kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java index 6a8b474a90781..1ea3bea3dc7fc 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java @@ -23,19 +23,19 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; /** - * Logger exception handler, just print error. + * Logger exception error handler. */ @RequiredArgsConstructor @Slf4j -public final class LoggerExceptionHandler implements ExceptionHandler { +public final class LoggerExceptionErrorHandler implements ExceptionHandler, ServerErrorResultHandler { @Override - public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { + public void handleServerError(final ChannelHandlerContext ctx, final ServerErrorResult result) { log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); } @Override - public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { - log.error("Socket error: ", throwable); + public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) { + log.error("Exception error: ", throwable); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java index 99e8bea16d20c..a495db77b6f0c 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -18,12 +18,10 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; import io.netty.channel.ChannelHandlerContext; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; -import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -32,53 +30,45 @@ /** * Retry streaming exception handler. */ -@RequiredArgsConstructor @Slf4j public class RetryStreamingExceptionHandler implements ExceptionHandler { - private final AtomicInteger retryCount = new AtomicInteger(0); + private final CDCClient cdcClient; + + private final AtomicInteger maxRetryTimes = new AtomicInteger(0); private final int retryIntervalMills; - private CDCClient cdcClient; + private final int retryTimes; - public RetryStreamingExceptionHandler(final int retryCount, final int retryIntervalMills) { - this.retryCount.set(retryCount); - this.retryIntervalMills = retryIntervalMills; - } - - @Override - public void setCDCClient(final CDCClient cdcClient) { + public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int maxRetryTimes, final int retryIntervalMills) { this.cdcClient = cdcClient; + this.maxRetryTimes.set(maxRetryTimes); + this.retryIntervalMills = retryIntervalMills; + retryTimes = 0; } @Override - public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { - log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); - reconnect(ctx); - } - - @Override - public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { + public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) { log.error("Socket error: {}", throwable.getMessage()); reconnect(ctx); } @SneakyThrows(InterruptedException.class) private void reconnect(final ChannelHandlerContext ctx) { - retryCount.incrementAndGet(); + maxRetryTimes.incrementAndGet(); if (null == cdcClient) { log.warn("CDC client is null, could not retry"); return; } ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (retryCount.get() > 5) { + if (retryTimes > maxRetryTimes.get()) { log.warn("Retry times exceed 5, stop streaming"); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); return; } TimeUnit.MILLISECONDS.sleep(retryIntervalMills); - log.info("Retry to restart streaming, retry count: {}", retryCount.get()); + log.info("Retry to restart streaming, retry count: {}", maxRetryTimes.get()); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java new file mode 100644 index 0000000000000..4bebbce69ae9d --- /dev/null +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.client.handler; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; + +/** + * Server error result handler. + */ +public interface ServerErrorResultHandler { + + /** + * Handle server ERROR. + * + * @param ctx channel handler context + * @param result error result + */ + void handleServerError(ChannelHandlerContext ctx, ServerErrorResult result); +} diff --git a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java index 69202eb9a5065..0a8e2591792b5 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; @@ -41,14 +42,14 @@ public static void main(final String[] args) { // Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server // TimeZone.setDefault(TimeZone.getTimeZone("UTC")); String address = "127.0.0.1"; - CDCClientConfiguration clientConfig = new CDCClientConfiguration(address, 33071, records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(3, 5000)); int reconnectCount = 0; int maxReconnectCount = 5; while (reconnectCount < maxReconnectCount) { - try (CDCClient cdcClient = new CDCClient(clientConfig)) { - cdcClient.connect(); + try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 5000))) { + LoggerExceptionErrorHandler loggerExceptionErrorHandler = new LoggerExceptionErrorHandler(); + cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000), loggerExceptionErrorHandler); cdcClient.login(new CDCLoginParameter("root", "root")); - String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true)); + String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order123").build()), true)); log.info("Streaming id={}", streamingId); cdcClient.await(); // CHECKSTYLE:OFF diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index dd2b2d8bbcce5..53db08e4b2961 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -169,10 +169,10 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; - CDCClient cdcClient = new CDCClient(cdcConfig); - cdcClient.connect(); + CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); + LoggerExceptionErrorHandler handler = new LoggerExceptionErrorHandler(); + cdcClient.connect(recordConsumer, handler, handler); cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), From 33b1c191d790118c049462fa0489ce3ebfd286ac Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Wed, 8 Nov 2023 10:56:12 +0800 Subject: [PATCH 6/6] Improve parameter and error log --- .../handler/LoggerExceptionErrorHandler.java | 41 ------------------- .../RetryStreamingExceptionHandler.java | 23 ++++------- .../cdc/client/example/Bootstrap.java | 26 ++++-------- .../e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 5 +-- 4 files changed, 18 insertions(+), 77 deletions(-) delete mode 100644 kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java deleted file mode 100644 index 1ea3bea3dc7fc..0000000000000 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionErrorHandler.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.cdc.client.handler; - -import io.netty.channel.ChannelHandlerContext; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; - -/** - * Logger exception error handler. - */ -@RequiredArgsConstructor -@Slf4j -public final class LoggerExceptionErrorHandler implements ExceptionHandler, ServerErrorResultHandler { - - @Override - public void handleServerError(final ChannelHandlerContext ctx, final ServerErrorResult result) { - log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); - } - - @Override - public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) { - log.error("Exception error: ", throwable); - } -} diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java index a495db77b6f0c..7efa76da1953a 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -31,44 +31,39 @@ * Retry streaming exception handler. */ @Slf4j -public class RetryStreamingExceptionHandler implements ExceptionHandler { +public final class RetryStreamingExceptionHandler implements ExceptionHandler { private final CDCClient cdcClient; - private final AtomicInteger maxRetryTimes = new AtomicInteger(0); + private final AtomicInteger maxRetryTimes; private final int retryIntervalMills; - private final int retryTimes; + private final AtomicInteger retryTimes = new AtomicInteger(0); public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int maxRetryTimes, final int retryIntervalMills) { this.cdcClient = cdcClient; - this.maxRetryTimes.set(maxRetryTimes); + this.maxRetryTimes = new AtomicInteger(maxRetryTimes); this.retryIntervalMills = retryIntervalMills; - retryTimes = 0; } @Override public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) { - log.error("Socket error: {}", throwable.getMessage()); + log.error("Catch exception: ", throwable); reconnect(ctx); } @SneakyThrows(InterruptedException.class) private void reconnect(final ChannelHandlerContext ctx) { - maxRetryTimes.incrementAndGet(); - if (null == cdcClient) { - log.warn("CDC client is null, could not retry"); - return; - } + retryTimes.incrementAndGet(); ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (retryTimes > maxRetryTimes.get()) { - log.warn("Retry times exceed 5, stop streaming"); + if (retryTimes.get() > maxRetryTimes.get()) { + log.warn("Stop try to reconnect, stop streaming ids: {}", connectionContext.getStreamingIds()); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); return; } TimeUnit.MILLISECONDS.sleep(retryIntervalMills); - log.info("Retry to restart streaming, retry count: {}", maxRetryTimes.get()); + log.info("Retry to restart streaming, retry times: {}", retryTimes.get()); connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java index 0a8e2591792b5..5b6b9ba0f07de 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; @@ -42,24 +41,13 @@ public static void main(final String[] args) { // Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server // TimeZone.setDefault(TimeZone.getTimeZone("UTC")); String address = "127.0.0.1"; - int reconnectCount = 0; - int maxReconnectCount = 5; - while (reconnectCount < maxReconnectCount) { - try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 5000))) { - LoggerExceptionErrorHandler loggerExceptionErrorHandler = new LoggerExceptionErrorHandler(); - cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000), loggerExceptionErrorHandler); - cdcClient.login(new CDCLoginParameter("root", "root")); - String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order123").build()), true)); - log.info("Streaming id={}", streamingId); - cdcClient.await(); - // CHECKSTYLE:OFF - } catch (final Exception ex) { - // CHECKSTYLE:ON - log.error("Exception occur: {}", ex.getMessage()); - } - Thread.sleep(5000); - log.info("Reconnect count: {}", reconnectCount); - reconnectCount++; + try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 10000))) { + cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000), + (ctx, result) -> log.error("Server error: {}", result.getErrorMessage())); + cdcClient.login(new CDCLoginParameter("root", "root")); + String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true)); + log.info("Streaming id={}", streamingId); + cdcClient.await(); } } } diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 53db08e4b2961..efc2ca5f19253 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -171,8 +171,7 @@ private void startCDCClient(final PipelineContainerComposer containerComposer, f DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); - LoggerExceptionErrorHandler handler = new LoggerExceptionErrorHandler(); - cdcClient.connect(recordConsumer, handler, handler); + cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage())); cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),