Skip to content

Commit

Permalink
Use deepEquals instead of equals at MySQLIncrementalDumper (#29127)
Browse files Browse the repository at this point in the history
* Use deepEquals instead of equals

* Clean invalid job config

* Persist valid job config
  • Loading branch information
azexcy authored Nov 22, 2023
1 parent 8f3e4fa commit ba88f53
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
for (int j = 0; j < beforeValues.length; j++) {
Serializable oldValue = beforeValues[j];
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
boolean updated = !Objects.deepEquals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, oldValue),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand Down Expand Up @@ -85,7 +87,10 @@ private static void watch() {
@Test
void assertIsJobConfigurationExisted() {
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo");
JobConfigurationPOJO value = new JobConfigurationPOJO();
value.setJobName("foo_job");
value.setShardingTotalCount(1);
getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", YamlEngine.marshal(value));
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
}

Expand Down

0 comments on commit ba88f53

Please sign in to comment.