Skip to content

Commit

Permalink
Notify the client if streaming stopped due to exception
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 10, 2023
1 parent 9f2110b commit 3c7cf09
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;

import io.netty.channel.Channel;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
Expand Down Expand Up @@ -55,6 +56,7 @@ public final class CDCSocketSink implements PipelineSink {

private final ShardingSphereDatabase database;

@Getter
private final Channel channel;

private final Map<String, String> tableNameSchemaMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
Expand Down Expand Up @@ -184,7 +186,7 @@ private final class CDCExecuteCallback implements ExecuteCallback {

private final String identifier;

private final PipelineJobItemContext jobItemContext;
private final CDCJobItemContext jobItemContext;

@Override
public void onSuccess() {
Expand All @@ -200,6 +202,10 @@ public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage()));
}
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down

0 comments on commit 3c7cf09

Please sign in to comment.