diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index e3e093d777cbd..d07019cefc6e3 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -179,7 +179,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina for (int j = 0; j < beforeValues.length; j++) { Serializable oldValue = beforeValues[j]; Serializable newValue = afterValues[j]; - boolean updated = !Objects.equals(newValue, oldValue); + boolean updated = !Objects.deepEquals(newValue, oldValue); PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1); dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, oldValue), diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index cf5203910f467..7806cda61dcb9 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -31,6 +31,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -85,7 +87,10 @@ private static void watch() { @Test void assertIsJobConfigurationExisted() { assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); - getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo"); + JobConfigurationPOJO value = new JobConfigurationPOJO(); + value.setJobName("foo_job"); + value.setShardingTotalCount(1); + getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", YamlEngine.marshal(value)); assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); }