Skip to content

Commit

Permalink
Remove LogicTableName
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 7, 2023
1 parent 9d33b01 commit fd683aa
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;

Expand All @@ -45,7 +45,7 @@ public final class ImporterConfiguration {
private final PipelineDataSourceConfiguration dataSourceConfig;

// TODO columnName case-insensitive?
private final Map<LogicTableName, Set<String>> shardingColumnsMap;
private final Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap;

private final TableAndSchemaNameMapper tableAndSchemaNameMapper;

Expand All @@ -63,7 +63,7 @@ public final class ImporterConfiguration {
* @return logic table names
*/
public Collection<String> getLogicTableNames() {
return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList()));
return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(CaseInsensitiveIdentifier::toString).collect(Collectors.toList()));
}

/**
Expand All @@ -73,7 +73,7 @@ public Collection<String> getLogicTableNames() {
* @return sharding columns
*/
public Set<String> getShardingColumns(final String logicTableName) {
return shardingColumnsMap.getOrDefault(new LogicTableName(logicTableName), Collections.emptySet());
return shardingColumnsMap.getOrDefault(new CaseInsensitiveIdentifier(logicTableName), Collections.emptySet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.datanode.DataNode;

import java.util.LinkedHashMap;
Expand Down Expand Up @@ -74,11 +73,11 @@ private static Map<String, Map<String, List<DataNode>>> groupDataSourceDataNodes
* @param dataNodeLine data node line
* @return actual table and logic table map
*/
public static Map<ActualTableName, LogicTableName> buildTableNameMap(final JobDataNodeLine dataNodeLine) {
Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
public static Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> buildTableNameMap(final JobDataNodeLine dataNodeLine) {
Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> result = new LinkedHashMap<>();
for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
for (DataNode dataNode : each.getDataNodes()) {
result.put(new ActualTableName(dataNode.getTableName()), new LogicTableName(each.getLogicTableName()));
result.put(new CaseInsensitiveIdentifier(dataNode.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName()));
}
}
return result;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
// TODO table name case-sensitive for some database
@EqualsAndHashCode(of = "lowercase")
public class CaseInsensitiveIdentifier {
public final class CaseInsensitiveIdentifier {

private final String original;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.common.util;

import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
Expand Down Expand Up @@ -48,16 +48,16 @@ public final class ShardingColumnsExtractor {
* @param logicTableNames logic table names
* @return sharding columns map
*/
public Map<LogicTableName, Set<String>> getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<LogicTableName> logicTableNames) {
public Map<CaseInsensitiveIdentifier, Set<String>> getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<CaseInsensitiveIdentifier> logicTableNames) {
Optional<ShardingRuleConfiguration> shardingRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
if (!shardingRuleConfig.isPresent()) {
return Collections.emptyMap();
}
Set<String> defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy());
Set<String> defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy());
Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
Map<CaseInsensitiveIdentifier, Set<String>> result = new ConcurrentHashMap<>();
for (ShardingTableRuleConfiguration each : shardingRuleConfig.get().getTables()) {
LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable());
if (!logicTableNames.contains(logicTableName)) {
continue;
}
Expand All @@ -67,7 +67,7 @@ public Map<LogicTableName, Set<String>> getShardingColumnsMap(final Collection<Y
result.put(logicTableName, shardingColumns);
}
for (ShardingAutoTableRuleConfiguration each : shardingRuleConfig.get().getAutoTables()) {
LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable());
if (!logicTableNames.contains(logicTableName)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;

import java.util.Map;

Expand All @@ -33,16 +32,16 @@
@ToString
public final class ActualAndLogicTableNameMapper {

private final Map<ActualTableName, LogicTableName> tableNameMap;
private final Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> tableNameMap;

/**
* Get logic table name.
*
* @param actualTableName actual table name
* @return logic table name
*/
public LogicTableName getLogicTableName(final String actualTableName) {
return tableNameMap.get(new ActualTableName(actualTableName));
public CaseInsensitiveIdentifier getLogicTableName(final String actualTableName) {
return tableNameMap.get(new CaseInsensitiveIdentifier(actualTableName));
}

/**
Expand All @@ -52,6 +51,6 @@ public LogicTableName getLogicTableName(final String actualTableName) {
* @return contains or not
*/
public boolean containsTable(final String actualTableName) {
return tableNameMap.containsKey(new ActualTableName(actualTableName));
return tableNameMap.containsKey(new CaseInsensitiveIdentifier(actualTableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper;

import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -33,7 +33,7 @@
@ToString
public final class TableAndSchemaNameMapper {

private final Map<LogicTableName, String> mapping;
private final Map<CaseInsensitiveIdentifier, String> mapping;

public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap);
Expand All @@ -44,13 +44,13 @@ public TableAndSchemaNameMapper(final Collection<String> tableNames) {
mapping = getLogicTableNameMap(tableNameSchemaMap);
}

private Map<LogicTableName, String> getLogicTableNameMap(final Map<String, String> tableSchemaMap) {
Map<LogicTableName, String> result = new HashMap<>(tableSchemaMap.size(), 1F);
private Map<CaseInsensitiveIdentifier, String> getLogicTableNameMap(final Map<String, String> tableSchemaMap) {
Map<CaseInsensitiveIdentifier, String> result = new HashMap<>(tableSchemaMap.size(), 1F);
for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
String tableName = entry.getKey();
String schemaName = entry.getValue();
if (null != schemaName) {
result.put(new LogicTableName(tableName), schemaName);
result.put(new CaseInsensitiveIdentifier(tableName), schemaName);
}
}
return result;
Expand All @@ -63,7 +63,7 @@ private Map<LogicTableName, String> getLogicTableNameMap(final Map<String, Strin
* @return schema name
*/
public String getSchemaName(final String logicTableName) {
return mapping.get(new LogicTableName(logicTableName));
return mapping.get(new CaseInsensitiveIdentifier(logicTableName));
}

/**
Expand All @@ -72,7 +72,7 @@ public String getSchemaName(final String logicTableName) {
* @param logicTableName logic table name
* @return schema name
*/
public String getSchemaName(final LogicTableName logicTableName) {
public String getSchemaName(final CaseInsensitiveIdentifier logicTableName) {
return mapping.get(logicTableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
Expand Down Expand Up @@ -156,7 +156,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even
}

private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
LogicTableName logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
CaseInsensitiveIdentifier logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.ingest.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.common.ingest.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
Expand Down Expand Up @@ -99,7 +98,7 @@ void setUp() throws SQLException {
private IncrementalDumperContext createDumperContext() {
DumperCommonContext commonContext = new DumperCommonContext(null,
new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new CaseInsensitiveIdentifier("t_order"), new CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
Expand Down Expand Up @@ -90,7 +90,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event)
}

private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
LogicTableName logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
CaseInsensitiveIdentifier logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
}

Expand Down
Loading

0 comments on commit fd683aa

Please sign in to comment.