Skip to content

Commit

Permalink
Add RetryStreamingExceptionHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 7, 2023
1 parent f39232e commit cff349c
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public final class CDCClient implements AutoCloseable {
public CDCClient(final CDCClientConfiguration config) {
validateParameter(config);
this.config = config;
if (null != config.getExceptionHandler()) {
config.getExceptionHandler().setCDCClient(this);
}
}

private void validateParameter(final CDCClientConfiguration parameter) {
Expand Down Expand Up @@ -165,9 +168,6 @@ public String startStreaming(final StartStreamingParameter parameter) {
* @param streamingId streaming id
*/
public void restartStreaming(final String streamingId) {
if (checkStreamingIdExist(streamingId)) {
stopStreaming(streamingId);
}
String requestId = RequestIdUtils.generateRequestId();
StartStreamingRequestBody body = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build();
Expand All @@ -179,16 +179,6 @@ public void restartStreaming(final String streamingId) {
log.info("Restart streaming success, streaming id: {}", streamingId);
}

private boolean checkStreamingIdExist(final String streamingId) {
checkChannelActive();
ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
if (null == connectionContext) {
log.warn("The connection context not exist");
return true;
}
return connectionContext.getStreamingIds().contains(streamingId);
}

/**
* Stop streaming.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
future.countDown();
requestType = future.getRequestType();
}
exceptionHandler.handleServerException(new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType));
exceptionHandler.handleServerException(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType));
responseFuture.ifPresent(future -> {
future.setErrorCode(response.getErrorCode());
future.setErrorMessage(response.getErrorMessage());
Expand Down Expand Up @@ -113,6 +113,6 @@ private void processDataRecords(final ChannelHandlerContext ctx, final DataRecor

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
exceptionHandler.handleSocketException(cause);
exceptionHandler.handleSocketException(ctx, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;

/**
Expand All @@ -27,14 +29,24 @@ public interface ExceptionHandler {
/**
* Handle server exception.
*
* @param ctx channel handler context
* @param result error result
*/
void handleServerException(ServerErrorResult result);
void handleServerException(ChannelHandlerContext ctx, ServerErrorResult result);

/**
* Handle socket exception.
*
* @param ctx channel handler context
* @param throwable throwable
*/
void handleSocketException(Throwable throwable);
void handleSocketException(ChannelHandlerContext ctx, Throwable throwable);

/**
* Set CDC client.
*
* @param cdcClient cdc client
*/
default void setCDCClient(CDCClient cdcClient) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
Expand All @@ -26,15 +27,15 @@
*/
@RequiredArgsConstructor
@Slf4j
public class LoggerExceptionHandler implements ExceptionHandler {
public final class LoggerExceptionHandler implements ExceptionHandler {

@Override
public void handleServerException(final ServerErrorResult result) {
public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) {
log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage());
}

@Override
public void handleSocketException(final Throwable throwable) {
public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) {
log.error("Socket error: ", throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Retry streaming exception handler.
*/
@RequiredArgsConstructor
@Slf4j
public class RetryStreamingExceptionHandler implements ExceptionHandler {

private final AtomicInteger retryTimes = new AtomicInteger(0);

private CDCClient cdcClient;

@Override
public void setCDCClient(final CDCClient cdcClient) {
this.cdcClient = cdcClient;
}

@Override
public void handleServerException(final ChannelHandlerContext ctx, final ServerErrorResult result) {
// Server exception maybe not be resolved by retrying
log.error("Server error, code: {}, message: {}", result.getErrorCode(), result.getErrorMessage());
}

@SneakyThrows(InterruptedException.class)
@Override
public void handleSocketException(final ChannelHandlerContext ctx, final Throwable throwable) {
if (null == cdcClient) {
log.warn("CDC client is null, could not retry");
return;
}
ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
if (retryTimes.get() > 5) {
log.warn("Retry times exceed 5, stop streaming");
connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each)));
return;
}
retryTimes.incrementAndGet();
TimeUnit.SECONDS.sleep(Math.min(5 >> retryTimes.get(), 10));
connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
Expand Down Expand Up @@ -169,7 +169,7 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont
private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new LoggerExceptionHandler());
CDCClientConfiguration cdcConfig = new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new RetryStreamingExceptionHandler());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : "";
CDCClient cdcClient = new CDCClient(cdcConfig);
cdcClient.connect();
Expand Down

0 comments on commit cff349c

Please sign in to comment.