Skip to content

Commit

Permalink
Improve parameter and error log
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 8, 2023
1 parent 9cbbfcb commit 33b1c19
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 77 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,39 @@
* Retry streaming exception handler.
*/
@Slf4j
public class RetryStreamingExceptionHandler implements ExceptionHandler {
public final class RetryStreamingExceptionHandler implements ExceptionHandler {

private final CDCClient cdcClient;

private final AtomicInteger maxRetryTimes = new AtomicInteger(0);
private final AtomicInteger maxRetryTimes;

private final int retryIntervalMills;

private final int retryTimes;
private final AtomicInteger retryTimes = new AtomicInteger(0);

public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int maxRetryTimes, final int retryIntervalMills) {
this.cdcClient = cdcClient;
this.maxRetryTimes.set(maxRetryTimes);
this.maxRetryTimes = new AtomicInteger(maxRetryTimes);
this.retryIntervalMills = retryIntervalMills;
retryTimes = 0;
}

@Override
public void handleException(final ChannelHandlerContext ctx, final Throwable throwable) {
log.error("Socket error: {}", throwable.getMessage());
log.error("Catch exception: ", throwable);
reconnect(ctx);
}

@SneakyThrows(InterruptedException.class)
private void reconnect(final ChannelHandlerContext ctx) {
maxRetryTimes.incrementAndGet();
if (null == cdcClient) {
log.warn("CDC client is null, could not retry");
return;
}
retryTimes.incrementAndGet();
ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
if (retryTimes > maxRetryTimes.get()) {
log.warn("Retry times exceed 5, stop streaming");
if (retryTimes.get() > maxRetryTimes.get()) {
log.warn("Stop try to reconnect, stop streaming ids: {}", connectionContext.getStreamingIds());
connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each)));
return;
}
TimeUnit.MILLISECONDS.sleep(retryIntervalMills);
log.info("Retry to restart streaming, retry count: {}", maxRetryTimes.get());
log.info("Retry to restart streaming, retry times: {}", retryTimes.get());
connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> cdcClient.restartStreaming(each)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
Expand All @@ -42,24 +41,13 @@ public static void main(final String[] args) {
// Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server
// TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
String address = "127.0.0.1";
int reconnectCount = 0;
int maxReconnectCount = 5;
while (reconnectCount < maxReconnectCount) {
try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 5000))) {
LoggerExceptionErrorHandler loggerExceptionErrorHandler = new LoggerExceptionErrorHandler();
cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000), loggerExceptionErrorHandler);
cdcClient.login(new CDCLoginParameter("root", "root"));
String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order123").build()), true));
log.info("Streaming id={}", streamingId);
cdcClient.await();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("Exception occur: {}", ex.getMessage());
}
Thread.sleep(5000);
log.info("Reconnect count: {}", reconnectCount);
reconnectCount++;
try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 10000))) {
cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000),
(ctx, result) -> log.error("Server error: {}", result.getErrorMessage()));
cdcClient.login(new CDCLoginParameter("root", "root"));
String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true));
log.info("Streaming id={}", streamingId);
cdcClient.await();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionErrorHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
Expand Down Expand Up @@ -171,8 +171,7 @@ private void startCDCClient(final PipelineContainerComposer containerComposer, f
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : "";
CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
LoggerExceptionErrorHandler handler = new LoggerExceptionErrorHandler();
cdcClient.connect(recordConsumer, handler, handler);
cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage()));
cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
Expand Down

0 comments on commit 33b1c19

Please sign in to comment.