Skip to content

Commit

Permalink
Refactor ImporterConfiguration.findSchemaName()
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 4, 2023
1 parent b7169ef commit 6887a49
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -76,13 +77,13 @@ public Set<String> getShardingColumns(final String logicTableName) {
}

/**
* Get schema name.
* Find schema name.
*
* @param logicTableName logic table name
* @return schema name. nullable
* @return schema name
*/
public String getSchemaName(final LogicTableName logicTableName) {
public Optional<String> findSchemaName(final String logicTableName) {
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
return dialectDatabaseMetaData.isSchemaAvailable() ? tableNameSchemaNameMapping.getSchemaName(logicTableName) : null;
return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableNameSchemaNameMapping.getSchemaName(logicTableName)) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
Expand Down Expand Up @@ -169,7 +168,7 @@ private void doFlush(final DataSource dataSource, final List<DataRecord> buffer)

private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
String insertSql = importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) {
batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
Expand All @@ -185,10 +184,6 @@ private void executeBatchInsert(final Connection connection, final List<DataReco
}
}

private String getSchemaName(final String logicTableName) {
return getImporterConfig().getSchemaName(new LogicTableName(logicTableName));
}

private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
for (DataRecord each : dataRecords) {
executeUpdate(connection, each);
Expand All @@ -199,7 +194,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec
Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
String updateSql = importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns);
String updateSql = importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) {
updateStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
Expand All @@ -226,7 +221,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec

private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
String deleteSQL = importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord,
String deleteSQL = importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
Expand Down

0 comments on commit 6887a49

Please sign in to comment.