From ba88f53093702bfbacfac007f1460cfc6dc3d253 Mon Sep 17 00:00:00 2001 From: Xinze Guo <101622833+azexcy@users.noreply.github.com> Date: Wed, 22 Nov 2023 17:25:45 +0800 Subject: [PATCH] Use deepEquals instead of equals at MySQLIncrementalDumper (#29127) * Use deepEquals instead of equals * Clean invalid job config * Persist valid job config --- .../data/pipeline/mysql/ingest/MySQLIncrementalDumper.java | 2 +- .../core/job/service/GovernanceRepositoryAPIImplTest.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index e3e093d777cbd..d07019cefc6e3 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -179,7 +179,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina for (int j = 0; j < beforeValues.length; j++) { Serializable oldValue = beforeValues[j]; Serializable newValue = afterValues[j]; - boolean updated = !Objects.equals(newValue, oldValue); + boolean updated = !Objects.deepEquals(newValue, oldValue); PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1); dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, oldValue), diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index cf5203910f467..7806cda61dcb9 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -31,6 +31,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -85,7 +87,10 @@ private static void watch() { @Test void assertIsJobConfigurationExisted() { assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); - getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo"); + JobConfigurationPOJO value = new JobConfigurationPOJO(); + value.setJobName("foo_job"); + value.setShardingTotalCount(1); + getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", YamlEngine.marshal(value)); assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); }