Skip to content

Commit

Permalink
Remove useless DumperCommonContext.targetTableColumnsMap
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 4, 2023
1 parent 4e24cf1 commit 63d7f31
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Base dumper context.
Expand All @@ -49,9 +44,6 @@ public abstract class DumperCommonContext {

private TableNameSchemaNameMapping tableNameSchemaNameMapping;

// LinkedHashSet is required
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap = new HashMap<>();

private IngestPosition position;

/**
Expand Down Expand Up @@ -97,26 +89,4 @@ public String getSchemaName(final LogicTableName logicTableName) {
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
}

/**
* Get column names.
*
* @param logicTableName logic table name
* @return column names
*/
public Collection<String> getColumnNames(final LogicTableName logicTableName) {
return targetTableColumnsMap.containsKey(logicTableName)
? targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
: Collections.singleton("*");
}

/**
* Get column names.
*
* @param actualTableName actual table name
* @return column names
*/
public Collection<ColumnName> getColumnNames(final String actualTableName) {
return targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private String buildInventoryDumpSQL() {
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
Collection<String> columnNames = Collections.singleton("*");
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand All @@ -55,7 +54,6 @@

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -161,28 +159,19 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
return result;
}

private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}

private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Expand All @@ -193,9 +182,6 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, oldValue),
handleValue(columnMetaData, newValue), updated, columnMetaData.isUniqueKey()));
Expand All @@ -206,15 +192,11 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
}

private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@
@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {

private IncrementalDumperContext dumperContext;

private MySQLIncrementalDumper incrementalDumper;

private PipelineTableMetaData pipelineTableMetaData;

@BeforeEach
void setUp() {
dumperContext = mockDumperContext();
IncrementalDumperContext dumperContext = mockDumperContext();
initTableData(dumperContext);
dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"));
PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class);
Expand All @@ -98,7 +96,6 @@ private IncrementalDumperContext mockDumperContext() {
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
result.setTargetTableColumnsMap(Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))));
return result;
}

Expand Down Expand Up @@ -129,16 +126,10 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowsEvent0(Collections.emptyMap(), 3);
assertWriteRowsEvent0(3);
}

@Test
void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertWriteRowsEvent0(mockTargetTableColumnsMap(), 1);
}

private void assertWriteRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertWriteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
Expand All @@ -157,16 +148,10 @@ private Map<LogicTableName, Collection<ColumnName>> mockTargetTableColumnsMap()

@Test
void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertUpdateRowsEvent0(Collections.emptyMap(), 3);
}

@Test
void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertUpdateRowsEvent0(mockTargetTableColumnsMap(), 1);
assertUpdateRowsEvent0(3);
}

private void assertUpdateRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertUpdateRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setDatabaseName("test");
rowsEvent.setTableName("t_order");
Expand All @@ -182,16 +167,10 @@ private void assertUpdateRowsEvent0(final Map<LogicTableName, Collection<ColumnN

@Test
void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertDeleteRowsEvent0(Collections.emptyMap(), 3);
}

@Test
void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertDeleteRowsEvent0(mockTargetTableColumnsMap(), 1);
assertDeleteRowsEvent0(3);
}

private void assertDeleteRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertDeleteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand All @@ -35,7 +34,6 @@
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;

import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -97,14 +95,13 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN

private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(IngestDataChangeType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getAfterRow());
return result;
}

private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(IngestDataChangeType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
String actualTableName = updateRowEvent.getTableName();
putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
putColumnsIntoDataRecord(result, tableMetaData, updateRowEvent.getAfterRow());
return result;
}

Expand All @@ -127,21 +124,13 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro
return result;
}

private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List<Object> values) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(actualTableName);
private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
for (int i = 0, count = values.size(); i < count; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
boolean isUniqueKey = columnMetaData.isUniqueKey();
Object uniqueKeyOldValue = isUniqueKey && IngestDataChangeType.UPDATE.equals(dataRecord.getType()) ? values.get(i) : null;
Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
}

private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand Down Expand Up @@ -54,7 +53,6 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -71,8 +69,6 @@

class WALEventConverterTest {

private IncrementalDumperContext dumperContext;

private WALEventConverter walEventConverter;

private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
Expand All @@ -81,7 +77,7 @@ class WALEventConverterTest {

@BeforeEach
void setUp() {
dumperContext = mockDumperContext();
IncrementalDumperContext dumperContext = mockDumperContext();
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
initTableData(dumperContext);
Expand Down Expand Up @@ -123,16 +119,10 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowEvent0(Collections.emptyMap(), 3);
}

@Test
void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException {
assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
assertWriteRowEvent0(3);
}

private void assertWriteRowEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertWriteRowEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowEvent rowsEvent = new WriteRowEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
Expand All @@ -143,10 +133,6 @@ private void assertWriteRowEvent0(final Map<LogicTableName, Collection<ColumnNam
assertThat(actual.getColumnCount(), is(expectedColumnCount));
}

private Map<LogicTableName, Collection<ColumnName>> mockTargetTableColumnsMap() {
return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
}

@Test
void assertConvertBeginTXEvent() {
BeginTXEvent beginTXEvent = new BeginTXEvent(100);
Expand Down

0 comments on commit 63d7f31

Please sign in to comment.