diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index a92bce3830482..a723f8b638fe4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -76,13 +77,13 @@ public Set getShardingColumns(final String logicTableName) { } /** - * Get schema name. + * Find schema name. * * @param logicTableName logic table name - * @return schema name. nullable + * @return schema name */ - public String getSchemaName(final LogicTableName logicTableName) { + public Optional findSchemaName(final String logicTableName) { DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData(); - return dialectDatabaseMetaData.isSchemaAvailable() ? tableNameSchemaNameMapping.getSchemaName(logicTableName) : null; + return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableNameSchemaNameMapping.getSchemaName(logicTableName)) : Optional.empty(); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java index 0995292e0dbb9..fe3134354bdb1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java @@ -26,7 +26,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; @@ -169,7 +168,7 @@ private void doFlush(final DataSource dataSource, final List buffer) private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); - String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord); + String insertSql = importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord); try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) { batchInsertStatement.set(preparedStatement); preparedStatement.setQueryTimeout(30); @@ -185,10 +184,6 @@ private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { for (DataRecord each : dataRecords) { executeUpdate(connection, each); @@ -199,7 +194,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec Set shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName()); List conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns); List setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList()); - String updateSql = importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns); + String updateSql = importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns); try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) { updateStatement.set(preparedStatement); for (int i = 0; i < setColumns.size(); i++) { @@ -226,7 +221,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec private void executeBatchDelete(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); - String deleteSQL = importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, + String deleteSQL = importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName()))); try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) { batchDeleteStatement.set(preparedStatement);