diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 0e41d2f8cac04..c6cd66526876b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -85,6 +85,7 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; import java.sql.SQLException; +import java.time.LocalDateTime; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -223,6 +224,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); jobConfigPOJO.setDisabled(disabled); if (disabled) { + jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER)); jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis())); } else { jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java index d45c14b4b8c58..9c84a6a9112ae 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; +import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import java.io.IOException; @@ -117,5 +118,6 @@ private void doAwait() { @Override public void close() throws IOException { + channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed.")); } }