Skip to content

Commit

Permalink
Refactor PipelineSQLSegmentBuilder (#33958)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 7, 2024
1 parent d3e9490 commit fe80113
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;

/**
* Pipeline SQL segment builder.
Expand Down Expand Up @@ -58,4 +59,19 @@ public String getQualifiedTableName(final String schemaName, final String tableN
result.append(getEscapedIdentifier(tableName));
return result.toString();
}

/**
* Get qualified table name.
*
* @param qualifiedTable qualified table
* @return qualified table name
*/
public String getQualifiedTableName(final QualifiedTable qualifiedTable) {
StringBuilder result = new StringBuilder();
if (dialectDatabaseMetaData.isSchemaAvailable() && !Strings.isNullOrEmpty(qualifiedTable.getSchemaName())) {
result.append(getEscapedIdentifier(qualifiedTable.getSchemaName())).append('.');
}
result.append(getEscapedIdentifier(qualifiedTable.getTableName()));
return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public String buildQueryRangeOrderingSQL(final QualifiedTable qualifiedTable, fi

private String buildQueryRangeOrderingSQL0(final QualifiedTable qualifiedTable, final Collection<String> columnNames, final List<String> uniqueKeys, final QueryRange queryRange,
@Nullable final List<String> shardingColumnsNames) {
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), qualifiedTable.getTableName());
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
String firstUniqueKey = uniqueKeys.get(0);
String orderByColumns = joinColumns(uniqueKeys, shardingColumnsNames).stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each) + " ASC").collect(Collectors.joining(", "));
Expand Down Expand Up @@ -93,15 +93,15 @@ private String buildUpperQueryRangeCondition(final String firstUniqueKey) {
/**
* Build point query SQL.
*
* @param table qualified table
* @param qualifiedTable qualified table
* @param columnNames column names
* @param uniqueKeys unique keys, it may be primary key, not null
* @param shardingColumnsNames sharding columns names, nullable
* @return built SQL
*/
public String buildPointQuerySQL(final QualifiedTable table, final Collection<String> columnNames, final List<String> uniqueKeys,
public String buildPointQuerySQL(final QualifiedTable qualifiedTable, final Collection<String> columnNames, final List<String> uniqueKeys,
@Nullable final List<String> shardingColumnsNames) {
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(table.getSchemaName(), table.getTableName());
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
String equalsConditions = joinColumns(uniqueKeys, shardingColumnsNames).stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each) + "=?").collect(Collectors.joining(" AND "));
return String.format("SELECT %s FROM %s WHERE %s", queryColumns, qualifiedTableName, equalsConditions);
Expand All @@ -126,6 +126,6 @@ private List<String> joinColumns(final List<String> uniqueKeys, final @Nullable
*/
public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable, final String columnName) {
return dialectSQLBuilder.buildCRC32SQL(
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), qualifiedTable.getTableName()), sqlSegmentBuilder.getEscapedIdentifier(columnName));
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable), sqlSegmentBuilder.getEscapedIdentifier(columnName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment;

import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;

Expand All @@ -43,15 +44,18 @@ void assertGetUnescapedIdentifier() {
@Test
void assertGetQualifiedTableNameWithUnsupportedSchema() {
assertThat(mySQLBuilder.getQualifiedTableName("foo_schema", "foo_tbl"), is("foo_tbl"));
assertThat(mySQLBuilder.getQualifiedTableName(new QualifiedTable("foo_schema", "foo_tbl")), is("foo_tbl"));
}

@Test
void assertGetQualifiedTableNameWithSupportedSchema() {
assertThat(postgreSQLBuilder.getQualifiedTableName("foo_schema", "foo_tbl"), is("foo_schema.foo_tbl"));
assertThat(postgreSQLBuilder.getQualifiedTableName(new QualifiedTable("foo_schema", "foo_tbl")), is("foo_schema.foo_tbl"));
}

@Test
void assertGetQualifiedTableNameWithSupportedSchemaAndNullSchema() {
assertThat(postgreSQLBuilder.getQualifiedTableName(null, "foo_tbl"), is("foo_tbl"));
assertThat(postgreSQLBuilder.getQualifiedTableName(new QualifiedTable(null, "foo_tbl")), is("foo_tbl"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLExc
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
String jobId = getJobIdByTableName(containerComposer, "ds_0.test." + SOURCE_TABLE_NAME);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
String schemaTableName = String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName, new SnowflakeKeyGenerateAlgorithm(),
String qualifiedTableName = String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource, schemaTableName, 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource, qualifiedTableName, 10000);
checkOrderMigration(containerComposer, jobId);
startMigrationWithSchema(containerComposer, "t_order_item", "t_order_item");
checkOrderItemMigration(containerComposer);
Expand Down

0 comments on commit fe80113

Please sign in to comment.