diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index f4aea914c980f..22ef95c6ab069 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,14 +17,12 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; @@ -55,7 +53,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; -import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -80,8 +77,8 @@ class MySQLIncrementalDumperTest { private PipelineTableMetaData pipelineTableMetaData; @BeforeEach - void setUp() { - IncrementalDumperContext dumperContext = mockDumperContext(); + void setUp() throws SQLException { + IncrementalDumperContext dumperContext = createDumperContext(); initTableData(dumperContext); dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); @@ -91,7 +88,7 @@ void setUp() { when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData); } - private IncrementalDumperContext mockDumperContext() { + private IncrementalDumperContext createDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); @@ -99,8 +96,7 @@ private IncrementalDumperContext mockDumperContext() { return result; } - @SneakyThrows(SQLException.class) - private void initTableData(final IncrementalDumperContext dumperContext) { + private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); @@ -125,62 +121,70 @@ private List mockOrderColumnsMetaDataList() { } @Test - void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException { - assertWriteRowsEvent0(3); - } - - private void assertWriteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { - WriteRowsEvent rowsEvent = new WriteRowsEvent(); - rowsEvent.setDatabaseName(""); - rowsEvent.setTableName("t_order"); - rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class); - List actual = (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); + void assertWriteRowsEvent() throws ReflectiveOperationException { + List actual = getRecordsByWriteRowsEvent(createWriteRowsEvent()); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.INSERT)); - assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount)); + assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private Map> mockTargetTableColumnsMap() { - return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))); + private WriteRowsEvent createWriteRowsEvent() { + WriteRowsEvent result = new WriteRowsEvent(); + result.setDatabaseName(""); + result.setTableName("t_order"); + result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); + return result; } - @Test - void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException { - assertUpdateRowsEvent0(3); + private List getRecordsByWriteRowsEvent(final WriteRowsEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class); + return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); } - private void assertUpdateRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { - UpdateRowsEvent rowsEvent = new UpdateRowsEvent(); - rowsEvent.setDatabaseName("test"); - rowsEvent.setTableName("t_order"); - rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"})); - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class); - List actual = (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); + @Test + void assertUpdateRowsEvent() throws ReflectiveOperationException { + List actual = getRecordsByUpdateRowsEvent(createUpdateRowsEvent()); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.UPDATE)); - assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount)); + assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - @Test - void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException { - assertDeleteRowsEvent0(3); + private UpdateRowsEvent createUpdateRowsEvent() { + UpdateRowsEvent result = new UpdateRowsEvent(); + result.setDatabaseName("test"); + result.setTableName("t_order"); + result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); + result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"})); + return result; } - private void assertDeleteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { - DeleteRowsEvent rowsEvent = new DeleteRowsEvent(); - rowsEvent.setDatabaseName(""); - rowsEvent.setTableName("t_order"); - rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class); - List actual = (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); + private List getRecordsByUpdateRowsEvent(final UpdateRowsEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class); + return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); + } + + @Test + void assertDeleteRowsEvent() throws ReflectiveOperationException { + List actual = getRecordsByDeleteRowsEvent(createDeleteRowsEvent()); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.DELETE)); - assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount)); + assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); + } + + private DeleteRowsEvent createDeleteRowsEvent() { + DeleteRowsEvent result = new DeleteRowsEvent(); + result.setDatabaseName(""); + result.setTableName("t_order"); + result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); + return result; + } + + private List getRecordsByDeleteRowsEvent(final DeleteRowsEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class); + return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); } @Test @@ -192,13 +196,17 @@ void assertPlaceholderEvent() throws ReflectiveOperationException { @Test void assertRowsEventFiltered() throws ReflectiveOperationException { - WriteRowsEvent rowsEvent = new WriteRowsEvent(); - rowsEvent.setDatabaseName("test"); - rowsEvent.setTableName("t_order"); - rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{1})); List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", AbstractBinlogEvent.class), - incrementalDumper, rowsEvent); + incrementalDumper, getFilteredWriteRowsEvent()); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); } + + private WriteRowsEvent getFilteredWriteRowsEvent() { + WriteRowsEvent result = new WriteRowsEvent(); + result.setDatabaseName("test"); + result.setTableName("t_order"); + result.setAfterRows(Collections.singletonList(new Serializable[]{1})); + return result; + } } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 1197dfefd8279..16ae05b1bc308 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -76,7 +75,7 @@ class WALEventConverterTest { private PipelineTableMetaData pipelineTableMetaData; @BeforeEach - void setUp() { + void setUp() throws SQLException { IncrementalDumperContext dumperContext = mockDumperContext(); PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); @@ -92,8 +91,7 @@ private IncrementalDumperContext mockDumperContext() { return result; } - @SneakyThrows(SQLException.class) - private void initTableData(final IncrementalDumperContext dumperContext) { + private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); @@ -118,19 +116,23 @@ private List mockOrderColumnsMetaDataList() { } @Test - void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException { - assertWriteRowEvent0(3); + void assertWriteRowEvent() throws ReflectiveOperationException { + DataRecord actual = getDataRecord(createWriteRowEvent()); + assertThat(actual.getType(), is(IngestDataChangeType.INSERT)); + assertThat(actual.getColumnCount(), is(3)); + } + + private WriteRowEvent createWriteRowEvent() { + WriteRowEvent result = new WriteRowEvent(); + result.setSchemaName(""); + result.setTableName("t_order"); + result.setAfterRow(Arrays.asList(101, 1, "OK")); + return result; } - private void assertWriteRowEvent0(final int expectedColumnCount) throws ReflectiveOperationException { - WriteRowEvent rowsEvent = new WriteRowEvent(); - rowsEvent.setSchemaName(""); - rowsEvent.setTableName("t_order"); - rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK")); + private DataRecord getDataRecord(final WriteRowEvent rowsEvent) throws ReflectiveOperationException { Method method = WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent", WriteRowEvent.class, PipelineTableMetaData.class); - DataRecord actual = (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData); - assertThat(actual.getType(), is(IngestDataChangeType.INSERT)); - assertThat(actual.getColumnCount(), is(expectedColumnCount)); + return (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData); } @Test