From f7517c184a3a42e75b68b27207aa2c2b9abeda33 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 18 Nov 2023 14:43:29 +0800 Subject: [PATCH] Add YamlPipelineJobItemProgressSwapper --- .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index dc0bda5338011..1e848b381a4ec 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -176,7 +176,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( - jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); + jobId, i, YamlEngine.marshal(getYamlPipelineJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobId, ex);