From 3c7cf0980982a016163fae8a5e322f188095f49d Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Fri, 10 Nov 2023 15:18:51 +0800 Subject: [PATCH] Notify the client if streaming stopped due to exception --- .../pipeline/cdc/core/importer/sink/CDCSocketSink.java | 2 ++ .../shardingsphere/data/pipeline/cdc/core/job/CDCJob.java | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java index d9f5ad83012fd..d45c14b4b8c58 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink; import io.netty.channel.Channel; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; @@ -55,6 +56,7 @@ public final class CDCSocketSink implements PipelineSink { private final ShardingSphereDatabase database; + @Getter private final Channel channel; private final Map tableNameSchemaMap = new HashMap<>(); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 9858e2932e333..50e572c5a3231 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -25,8 +25,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; +import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink; import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer; import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner; +import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; @@ -184,7 +186,7 @@ private final class CDCExecuteCallback implements ExecuteCallback { private final String identifier; - private final PipelineJobItemContext jobItemContext; + private final CDCJobItemContext jobItemContext; @Override public void onSuccess() { @@ -200,6 +202,10 @@ public void onFailure(final Throwable throwable) { log.error("onFailure, {} task execute failed.", identifier, throwable); String jobId = jobItemContext.getJobId(); jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable); + if (jobItemContext.getSink() instanceof CDCSocketSink) { + CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink(); + cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage())); + } PipelineJobCenter.stop(jobId); jobAPI.updateJobConfigurationDisabled(jobId, true); }