From 41eaef725c5705633c5abfc0d62e8356065188ba Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Wed, 20 Dec 2023 00:26:42 +0800 Subject: [PATCH] Refactor DataSourceCheckEngine (#29462) * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine * Refactor DataSourceCheckEngine --- .../core/checker/DataSourceCheckEngine.java | 90 ++++++------------- .../datasource/DataSourceCheckEngineTest.java | 17 ++-- .../preparer/MigrationJobPreparer.java | 5 +- 3 files changed, 42 insertions(+), 70 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java index f9e48d9a22465..e77594143223c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; import java.sql.Connection; @@ -31,7 +32,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; -import java.util.Collections; /** * Data source check engine. @@ -47,37 +47,13 @@ public DataSourceCheckEngine(final DatabaseType databaseType) { sqlBuilder = new PipelineCommonSQLBuilder(databaseType); } - /** - * Check source data source. - * - * @param dataSource to be checked source data source - */ - public void checkSourceDataSource(final DataSource dataSource) { - Collection dataSources = Collections.singleton(dataSource); - checkConnection(dataSources); - checkPrivilege(dataSources); - checkVariable(dataSources); - } - - /** - * Check target data source. - * - * @param dataSource to be checked target data source - * @param importerConfig importer configuration - */ - public void checkTargetDataSource(final DataSource dataSource, final ImporterConfiguration importerConfig) { - Collection dataSources = Collections.singleton(dataSource); - checkConnection(dataSources); - checkTargetTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames()); - } - /** * Check data source connections. * * @param dataSources data sources * @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception */ - public void checkConnection(final Collection dataSources) { + public void checkConnection(final Collection dataSources) { try { for (DataSource each : dataSources) { each.getConnection().close(); @@ -88,22 +64,38 @@ public void checkConnection(final Collection dataSources) } /** - * Check table is empty. + * Check source data source. + * + * @param dataSources to be checked source data source + */ + public void checkSourceDataSource(final Collection dataSources) { + checkConnection(dataSources); + if (null == checker) { + return; + } + dataSources.forEach(checker::checkPrivilege); + dataSources.forEach(checker::checkVariable); + } + + /** + * Check target data sources. * - * @param dataSources data sources - * @param tableAndSchemaNameMapper mapping - * @param logicTableNames logic table names - * @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception + * @param dataSources to be checked target data sources + * @param importerConfig importer configuration */ + public void checkTargetDataSources(final Collection dataSources, final ImporterConfiguration importerConfig) { + checkConnection(dataSources); + checkTargetTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames()); + } + // TODO rename to common usage name // TODO Merge schemaName and tableNames - public void checkTargetTable(final Collection dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection logicTableNames) { + private void checkTargetTable(final Collection dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection logicTableNames) { try { for (DataSource each : dataSources) { for (String tableName : logicTableNames) { - if (!checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) { - throw new PrepareJobWithTargetTableNotEmptyException(tableName); - } + ShardingSpherePreconditions.checkState(checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName), + () -> new PrepareJobWithTargetTableNotEmptyException(tableName)); } } } catch (final SQLException ex) { @@ -120,32 +112,4 @@ private boolean checkEmpty(final DataSource dataSource, final String schemaName, return !resultSet.next(); } } - - /** - * Check user privileges. - * - * @param dataSources data sources - */ - public void checkPrivilege(final Collection dataSources) { - if (null == checker) { - return; - } - for (DataSource each : dataSources) { - checker.checkPrivilege(each); - } - } - - /** - * Check data source variables. - * - * @param dataSources data sources - */ - public void checkVariable(final Collection dataSources) { - if (null == checker) { - return; - } - for (DataSource each : dataSources) { - checker.checkVariable(each); - } - } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index f117cf933dd5c..16bf462303f76 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine; +import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; @@ -39,6 +40,7 @@ import java.util.LinkedList; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -82,20 +84,25 @@ void assertCheckConnectionFailed() throws SQLException { } @Test - void assertCheckTargetTable() throws SQLException { + void assertCheckTargetDataSources() throws SQLException { when(dataSource.getConnection()).thenReturn(connection); when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement); when(preparedStatement.executeQuery()).thenReturn(resultSet); - dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order")); + ImporterConfiguration importerConfig = mock(ImporterConfiguration.class); + when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new TableAndSchemaNameMapper(Collections.emptyMap())); + when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order")); + dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig); } @Test - void assertCheckTargetTableFailed() throws SQLException { + void assertCheckTargetDataSourcesFailed() throws SQLException { when(dataSource.getConnection()).thenReturn(connection); when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement); when(preparedStatement.executeQuery()).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); - assertThrows(PrepareJobWithTargetTableNotEmptyException.class, - () -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order"))); + ImporterConfiguration importerConfig = mock(ImporterConfiguration.class); + when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new TableAndSchemaNameMapper(Collections.emptyMap())); + when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order")); + assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index db5e64077980f..9087424ba82d0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -76,6 +76,7 @@ import java.sql.SQLException; import java.util.Collection; +import java.util.Collections; import java.util.LinkedList; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -101,7 +102,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType(); - new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource()); + new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { PipelineJobRegistry.stop(jobItemContext.getJobId()); return; @@ -163,7 +164,7 @@ private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) } if (null == jobItemContext.getInitProgress()) { PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); - new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource, jobItemContext.getTaskConfig().getImporterConfig()); + new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource), jobItemContext.getTaskConfig().getImporterConfig()); } }