diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java index 82c3adc2eb030..e1b31c2decc73 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java @@ -35,6 +35,8 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus; import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; import org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ServerErrorResultHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils; @@ -49,6 +51,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record; + +import java.util.List; +import java.util.function.Consumer; /** * CDC client. @@ -78,9 +84,13 @@ private void validateParameter(final CDCClientConfiguration parameter) { /** * Connect. + * + * @param dataConsumer data consumer + * @param exceptionHandler exception handler + * @param errorResultHandler error result handler */ @SneakyThrows(InterruptedException.class) - public void connect() { + public void connect(final Consumer> dataConsumer, final ExceptionHandler exceptionHandler, final ServerErrorResultHandler errorResultHandler) { Bootstrap bootstrap = new Bootstrap(); group = new NioEventLoopGroup(1); bootstrap.channel(NioSocketChannel.class) @@ -95,7 +105,7 @@ protected void initChannel(final NioSocketChannel channel) { channel.pipeline().addLast(new ProtobufDecoder(CDCResponse.getDefaultInstance())); channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); channel.pipeline().addLast(new ProtobufEncoder()); - channel.pipeline().addLast(new CDCRequestHandler(config.getDataConsumer(), config.getExceptionHandler())); + channel.pipeline().addLast(new CDCRequestHandler(dataConsumer, exceptionHandler, errorResultHandler)); } }); channel = bootstrap.connect(config.getAddress(), config.getPort()).sync().channel(); @@ -165,9 +175,6 @@ public String startStreaming(final StartStreamingParameter parameter) { * @param streamingId streaming id */ public void restartStreaming(final String streamingId) { - if (checkStreamingIdExist(streamingId)) { - stopStreaming(streamingId); - } String requestId = RequestIdUtils.generateRequestId(); StartStreamingRequestBody body = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build(); CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build(); @@ -179,16 +186,6 @@ public void restartStreaming(final String streamingId) { log.info("Restart streaming success, streaming id: {}", streamingId); } - private boolean checkStreamingIdExist(final String streamingId) { - checkChannelActive(); - ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get(); - if (null == connectionContext) { - log.warn("The connection context not exist"); - return true; - } - return connectionContext.getStreamingIds().contains(streamingId); - } - /** * Stop streaming. * diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java index 99c7de6942061..6b2b5a3900288 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java @@ -19,11 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler; -import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record; - -import java.util.List; -import java.util.function.Consumer; /** * CDC client configuration. @@ -36,17 +31,5 @@ public final class CDCClientConfiguration { private final int port; - private final Consumer> dataConsumer; - - private final ExceptionHandler exceptionHandler; - private final int timeoutMills; - - public CDCClientConfiguration(final String address, final int port, final Consumer> dataConsumer, final ExceptionHandler exceptionHandler) { - this.address = address; - this.port = port; - this.dataConsumer = dataConsumer; - this.exceptionHandler = exceptionHandler; - this.timeoutMills = 5000; - } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java index b991ef1a4481a..630cc41916e61 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java @@ -50,6 +50,8 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter { private final ExceptionHandler exceptionHandler; + private final ServerErrorResultHandler errorResultHandler; + @Override public void channelRegistered(final ChannelHandlerContext ctx) { ClientConnectionContext context = new ClientConnectionContext(); @@ -78,7 +80,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { future.countDown(); requestType = future.getRequestType(); } - exceptionHandler.handleServerException(new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); + errorResultHandler.handleServerError(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType)); responseFuture.ifPresent(future -> { future.setErrorCode(response.getErrorCode()); future.setErrorMessage(response.getErrorMessage()); @@ -113,6 +115,6 @@ private void processDataRecords(final ChannelHandlerContext ctx, final DataRecor @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - exceptionHandler.handleSocketException(cause); + exceptionHandler.handleException(ctx, cause); } } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java index 9664809f4727c..3867c8b162362 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; -import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; +import io.netty.channel.ChannelHandlerContext; /** * Exception handler. @@ -25,16 +25,10 @@ public interface ExceptionHandler { /** - * Handle server exception. - * - * @param result error result - */ - void handleServerException(ServerErrorResult result); - - /** - * Handle socket exception. + * Handle exception. * + * @param ctx channel handler context * @param throwable throwable */ - void handleSocketException(Throwable throwable); + void handleException(ChannelHandlerContext ctx, Throwable throwable); } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java new file mode 100644 index 0000000000000..7efa76da1953a --- /dev/null +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.client.handler; + +import io.netty.channel.ChannelHandlerContext; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; +import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Retry streaming exception handler. + */ +@Slf4j +public final class RetryStreamingExceptionHandler implements ExceptionHandler { + + private final CDCClient cdcClient; + + private final AtomicInteger maxRetryTimes; + + private final int retryIntervalMills; + + private final AtomicInteger retryTimes = new AtomicInteger(0); + + public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int maxRetryTimes, final int retryIntervalMills) { + this.cdcClient = cdcClient; + this.maxRetryTimes = new AtomicInteger(maxRetryTimes); + this.retryIntervalMills = retryIntervalMills; + } + + @Override + public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) { + log.error("Catch exception: ", throwable); + reconnect(ctx); + } + + @SneakyThrows(InterruptedException.class) + private void reconnect(final ChannelHandlerContext ctx) { + retryTimes.incrementAndGet(); + ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get(); + if (retryTimes.get() > maxRetryTimes.get()) { + log.warn("Stop try to reconnect, stop streaming ids: {}", connectionContext.getStreamingIds()); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each))); + return; + } + TimeUnit.MILLISECONDS.sleep(retryIntervalMills); + log.info("Retry to restart streaming, retry times: {}", retryTimes.get()); + connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each))); + } +} diff --git a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java similarity index 63% rename from kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java rename to kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java index cf881ff26d88d..4bebbce69ae9d 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ServerErrorResultHandler.java @@ -17,24 +17,19 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import io.netty.channel.ChannelHandlerContext; import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult; /** - * Logger exception handler, just print error. + * Server error result handler. */ -@RequiredArgsConstructor -@Slf4j -public class LoggerExceptionHandler implements ExceptionHandler { +public interface ServerErrorResultHandler { - @Override - public void handleServerException(final ServerErrorResult result) { - log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage()); - } - - @Override - public void handleSocketException(final Throwable throwable) { - log.error("Socket error: ", throwable); - } + /** + * Handle server ERROR. + * + * @param ctx channel handler context + * @param result error result + */ + void handleServerError(ChannelHandlerContext ctx, ServerErrorResult result); } diff --git a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java index f00b12a401377..5b6b9ba0f07de 100644 --- a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java +++ b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; @@ -41,9 +41,9 @@ public static void main(final String[] args) { // Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server // TimeZone.setDefault(TimeZone.getTimeZone("UTC")); String address = "127.0.0.1"; - CDCClientConfiguration clientConfig = new CDCClientConfiguration(address, 33071, records -> log.info("records: {}", records), new LoggerExceptionHandler()); - try (CDCClient cdcClient = new CDCClient(clientConfig)) { - cdcClient.connect(); + try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 10000))) { + cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000), + (ctx, result) -> log.error("Server error: {}", result.getErrorMessage())); cdcClient.login(new CDCLoginParameter("root", "root")); String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true)); log.info("Streaming id={}", streamingId); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index 15c2e3a571516..305ec2b7f35f6 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -41,7 +41,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; @@ -79,23 +78,20 @@ public final class CDCImporter extends AbstractPipelineLifecycleRunnable impleme @Override protected void runBlocking() { CDCImporterManager.putImporter(this); - List channelProgressPairs = new ArrayList<>(originalChannelProgressPairs); while (isRunning()) { if (needSorting) { - doWithSorting(channelProgressPairs); + doWithSorting(originalChannelProgressPairs); } else { - doWithoutSorting(channelProgressPairs); + doWithoutSorting(originalChannelProgressPairs); } - if (channelProgressPairs.isEmpty()) { + if (originalChannelProgressPairs.isEmpty()) { break; } } } private void doWithoutSorting(final List channelProgressPairs) { - Iterator channelProgressPairsIterator = channelProgressPairs.iterator(); - while (channelProgressPairsIterator.hasNext()) { - CDCChannelProgressPair channelProgressPair = channelProgressPairsIterator.next(); + for (final CDCChannelProgressPair channelProgressPair : channelProgressPairs) { PipelineChannel channel = channelProgressPair.getChannel(); List records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList()); if (records.isEmpty()) { @@ -108,9 +104,6 @@ private void doWithoutSorting(final List channelProgress ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))))); sink.write(ackId, records); Record lastRecord = records.get(records.size() - 1); - if (lastRecord instanceof FinishedRecord) { - channelProgressPairsIterator.remove(); - } if (lastRecord instanceof FinishedRecord && records.stream().noneMatch(DataRecord.class::isInstance)) { channel.ack(records); channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); @@ -236,9 +229,14 @@ public void ack(final String ackId) { } for (Pair each : channelPositionPairList) { CDCAckPosition ackPosition = each.getRight(); - each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord())); + Record lastRecord = ackPosition.getLastRecord(); + each.getLeft().getChannel().ack(Collections.singletonList(lastRecord)); + if (lastRecord instanceof FinishedRecord) { + originalChannelProgressPairs.remove(each.getKey()); + } each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount())); } + ackCache.invalidate(ackId); } @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 25d6c7db4070a..681307a84c1a0 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -55,6 +55,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -75,7 +76,7 @@ public final class CDCJobPreparer { public void initTasks(final Collection jobItemContexts) { // TODO Use pipeline tree to build it AtomicBoolean inventoryImporterUsed = new AtomicBoolean(); - List inventoryChannelProgressPairs = new LinkedList<>(); + List inventoryChannelProgressPairs = new CopyOnWriteArrayList<>(); AtomicBoolean incrementalImporterUsed = new AtomicBoolean(); List incrementalChannelProgressPairs = new LinkedList<>(); for (CDCJobItemContext each : jobItemContexts) { diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java index f9f6063f5a8bc..88ded2b79edf1 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java @@ -44,12 +44,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException; import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion; -import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException; -import org.apache.shardingsphere.infra.metadata.user.Grantee; -import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.core.external.sql.ShardingSphereSQLException; import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; +import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException; +import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException; +import org.apache.shardingsphere.infra.metadata.user.Grantee; +import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import java.net.InetSocketAddress; @@ -172,8 +173,12 @@ private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDC throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty")); } checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase()); - CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel()); - ctx.writeAndFlush(response); + try { + CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel()); + ctx.writeAndFlush(response); + } catch (final PipelineSQLException ex) { + throw new CDCExceptionWrapper(request.getRequestId(), ex); + } } private void processAckStreamingRequest(final CDCRequest request) { diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 33af5f101a3f8..efc2ca5f19253 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -20,19 +20,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient; import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler; +import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; @@ -169,10 +169,9 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; - CDCClient cdcClient = new CDCClient(cdcConfig); - cdcClient.connect(); + CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); + cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage())); cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),