diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java index 82c3adc2eb030..bd362ee2cd7bf 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java @@ -65,6 +65,9 @@ public final class CDCClient implements AutoCloseable { public CDCClient(final CDCClientConfiguration config) { validateParameter(config); this.config = config; + if (null != config.getExceptionHandler()) { + config.getExceptionHandler().setCDCClient(this); + } } private void validateParameter(final CDCClientConfiguration parameter) { @@ -165,9 +168,6 @@ public String startStreaming(final StartStreamingParameter parameter) { * @param streamingId streaming id */ public void restartStreaming(final String streamingId) { - if (checkStreamingIdExist(streamingId)) { - stopStreaming(streamingId); - } String requestId = RequestIdUtils.generateRequestId(); StartStreamingRequestBody body = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build(); CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build(); @@ -179,16 +179,6 @@ public void restartStreaming(final String streamingId) { log.info("Restart streaming success, streaming id: {}", streamingId); } - private boolean checkStreamingIdExist(final String streamingId) { - checkChannelActive(); - ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (null == connectionContext) { - log.warn("The connection context not exist"); - return true; - } - return connectionContext.getStreamingIds().contains(streamingId); - } - /** * Stop streaming. * diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java index b991ef1a4481a..621f91af5b4b4 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java @@ -78,7 +78,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { future.countDown(); requestType = future.getRequestType(); } - exceptionHandler.handleServerException(new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); + exceptionHandler.handleServerException(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); responseFuture.ifPresent(future -> { future.setErrorCode(response.getErrorCode()); future.setErrorMessage(response.getErrorMessage()); @@ -113,6 +113,6 @@ private void processDataRecords(final ChannelHandlerContext ctx, final DataRecor @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - exceptionHandler.handleSocketException(cause); + exceptionHandler.handleSocketException(ctx, cause); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java index 9664809f4727c..4c50ab1335323 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; /** @@ -27,14 +29,24 @@ public interface ExceptionHandler { /** * Handle server exception. * + * @param ctx channel handler context * @param result error result */ - void handleServerException(ServerErrorResult result); + void handleServerException(ChannelHandlerContext ctx, ServerErrorResult result); /** * Handle socket exception. * + * @param ctx channel handler context * @param throwable throwable */ - void handleSocketException(Throwable throwable); + void handleSocketException(ChannelHandlerContext ctx, Throwable throwable); + + /** + * Set CDC client. + * + * @param cdcClient cdc client + */ + default void setCDCClient(CDCClient cdcClient) { + } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java index cf881ff26d88d..6a8b474a90781 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; +import io.netty.channel.ChannelHandlerContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; @@ -26,15 +27,15 @@ */ @RequiredArgsConstructor @Slf4j -public class LoggerExceptionHandler implements ExceptionHandler { +public final class LoggerExceptionHandler implements ExceptionHandler { @Override - public void handleServerException(final ServerErrorResult result) { + public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); } @Override - public void handleSocketException(final Throwable throwable) { + public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { log.error("Socket error: ", throwable); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java new file mode 100644 index 0000000000000..50186c40dfa75 --- /dev/null +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.client.handler; + +import io.netty.channel.ChannelHandlerContext; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; +import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; +import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Retry streaming exception handler. + */ +@RequiredArgsConstructor +@Slf4j +public class RetryStreamingExceptionHandler implements ExceptionHandler { + + private final AtomicInteger retryTimes = new AtomicInteger(0); + + private CDCClient cdcClient; + + @Override + public void setCDCClient(final CDCClient cdcClient) { + this.cdcClient = cdcClient; + } + + @Override + public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) { + // Server exception maybe not be resolved by retrying + log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); + } + + @SneakyThrows(InterruptedException.class) + @Override + public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) { + if (null == cdcClient) { + log.warn("CDC client is null, could not retry"); + return; + } + ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); + if (retryTimes.get() > 5) { + log.warn("Retry times exceed 5, stop streaming"); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); + return; + } + retryTimes.incrementAndGet(); + TimeUnit.SECONDS.sleep(Math.min(5 >> retryTimes.get(), 10)); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); + } +} diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 5f586cc8d4b47..93e096de8686f 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -169,7 +169,7 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler()); + CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new RetryStreamingExceptionHandler()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient cdcClient = new CDCClient(cdcConfig); cdcClient.connect();