Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename CaseInsensitiveQualifiedTable #28988

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;

/**
* Create table configuration.
Expand All @@ -33,9 +33,9 @@ public final class CreateTableConfiguration {

private final PipelineDataSourceConfiguration sourceDataSourceConfig;

private final SchemaTableName sourceName;
private final CaseInsensitiveQualifiedTable sourceName;

private final PipelineDataSourceConfiguration targetDataSourceConfig;

private final SchemaTableName targetName;
private final CaseInsensitiveQualifiedTable targetName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,27 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

/**
* Schema name and table name.
* Case insensitive qualified table.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class SchemaTableName {
// TODO should merge with QualifiedTable
public final class CaseInsensitiveQualifiedTable {

private final CaseInsensitiveIdentifier schemaName;

private final CaseInsensitiveIdentifier tableName;

public SchemaTableName(final String schemaName, final String tableName) {
public CaseInsensitiveQualifiedTable(final String schemaName, final String tableName) {
this.schemaName = new CaseInsensitiveIdentifier(schemaName);
this.tableName = new CaseInsensitiveIdentifier(tableName);
}

/**
* Marshal to text.
*
* @return text
*/
public String marshal() {
String schemaName = this.schemaName.toString();
return null == schemaName ? tableName.toString() : schemaName + "." + tableName;
@Override
public String toString() {
return null == schemaName ? tableName.toString() : String.join(".", schemaName.toString(), tableName.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
Expand All @@ -40,9 +40,9 @@ public final class TableInventoryCheckParameter {

private final PipelineDataSourceWrapper targetDataSource;

private final SchemaTableName sourceTable;
private final CaseInsensitiveQualifiedTable sourceTable;

private final SchemaTableName targetTable;
private final CaseInsensitiveQualifiedTable targetTable;

private final List<String> columnNames;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand All @@ -40,7 +40,7 @@ public final class SingleTableInventoryCalculateParameter {
*/
private final PipelineDataSourceWrapper dataSource;

private final SchemaTableName table;
private final CaseInsensitiveQualifiedTable table;

private final List<String> columnNames;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;

import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
Expand Down Expand Up @@ -65,7 +65,8 @@ class CRC32SingleTableInventoryCalculatorTest {
void setUp() throws SQLException {
DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
List<PipelineColumnMetaData> uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true));
parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource, new SchemaTableName(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap());
parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource,
new CaseInsensitiveQualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap());
when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
Expand Down Expand Up @@ -284,8 +284,8 @@ private Collection<CreateTableConfiguration> buildCreateTableConfigurations(fina
DataNode dataNode = each.getDataNodes().get(0);
PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName());
CreateTableConfiguration createTableConfig = new CreateTableConfiguration(
sourceDataSourceConfig, new SchemaTableName(sourceSchemaName, dataNode.getTableName()),
jobConfig.getTarget(), new SchemaTableName(targetSchemaName, each.getLogicTableName()));
sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
result.add(createTableConfig);
}
log.info("buildCreateTableConfigurations, result={}", result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
Expand Down Expand Up @@ -86,15 +86,15 @@ public Map<String, TableDataConsistencyCheckResult> check(final String algorithm
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
Map<SchemaTableName, TableDataConsistencyCheckResult> result = new LinkedHashMap<>();
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
TableDataConsistencyChecker tableChecker = TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
checkTableInventoryData(each, tableChecker, result, dataSourceManager);
}
}
return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().marshal(), Entry::getValue));
return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue));
}

private long getRecordsCount() {
Expand All @@ -103,11 +103,11 @@ private long getRecordsCount() {
}

private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
final Map<SchemaTableName, TableDataConsistencyCheckResult> checkResultMap, final PipelineDataSourceManager dataSourceManager) {
final Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> checkResultMap, final PipelineDataSourceManager dataSourceManager) {
for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
for (DataNode each : entry.getDataNodes()) {
TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, dataSourceManager);
checkResultMap.put(new SchemaTableName(each.getSchemaName(), each.getTableName()), checkResult);
checkResultMap.put(new CaseInsensitiveQualifiedTable(each.getSchemaName(), each.getTableName()), checkResult);
if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) {
log.info("Unmatched on table '{}', ignore left tables", DataNodeUtils.formatWithSchema(each));
return;
Expand All @@ -118,8 +118,8 @@ private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, fina

private TableDataConsistencyCheckResult checkSingleTableInventoryData(final String targetTableName, final DataNode dataNode,
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager dataSourceManager) {
SchemaTableName sourceTable = new SchemaTableName(dataNode.getSchemaName(), dataNode.getTableName());
SchemaTableName targetTable = new SchemaTableName(dataNode.getSchemaName(), targetTableName);
CaseInsensitiveQualifiedTable sourceTable = new CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
CaseInsensitiveQualifiedTable targetTable = new CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName);
PipelineDataSourceWrapper sourceDataSource = dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
PipelineDataSourceWrapper targetDataSource = dataSourceManager.getDataSource(jobConfig.getTarget());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
Expand Down Expand Up @@ -132,14 +132,14 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
}
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS)
.until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(dialectDatabaseMetaData)).size() == actualProxyList.size());
SchemaTableName orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable()
? new SchemaTableName(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
: new SchemaTableName(null, SOURCE_TABLE_NAME);
CaseInsensitiveQualifiedTable orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable()
? new CaseInsensitiveQualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
: new CaseInsensitiveQualifiedTable(null, SOURCE_TABLE_NAME);
PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(dataSource, containerComposer.getDatabaseType());
PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4),
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
assertDataMatched(sourceDataSource, targetDataSource, new SchemaTableName(null, "t_address"));
assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address"));
containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
}
Expand Down Expand Up @@ -191,7 +191,7 @@ private String getOrderTableNameWithSchema(final DialectDatabaseMetaData dialect
return dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
}

private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) {
private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final CaseInsensitiveQualifiedTable schemaTableName) {
StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), schemaTableName.getTableName().toString());
List<PipelineColumnMetaData> uniqueKeys = Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
Expand Down Expand Up @@ -108,6 +108,6 @@ void assertCalculateOfAllQueryFromMiddle() {

private SingleTableInventoryCalculateParameter generateParameter(final PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) {
List<PipelineColumnMetaData> uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "integer", false, true, true));
return new SingleTableInventoryCalculateParameter(dataSource, new SchemaTableName(null, "t_order"), Collections.emptyList(), uniqueKeys, dataCheckPosition);
return new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(), uniqueKeys, dataCheckPosition);
}
}
Loading