Skip to content

Commit

Permalink
Refactor DataSourceCheckEngine (#29462)
Browse files Browse the repository at this point in the history
* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine
  • Loading branch information
terrymanu authored Dec 19, 2023
1 parent 2838da8 commit 41eaef7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;

/**
* Data source check engine.
Expand All @@ -47,37 +47,13 @@ public DataSourceCheckEngine(final DatabaseType databaseType) {
sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}

/**
* Check source data source.
*
* @param dataSource to be checked source data source
*/
public void checkSourceDataSource(final DataSource dataSource) {
Collection<DataSource> dataSources = Collections.singleton(dataSource);
checkConnection(dataSources);
checkPrivilege(dataSources);
checkVariable(dataSources);
}

/**
* Check target data source.
*
* @param dataSource to be checked target data source
* @param importerConfig importer configuration
*/
public void checkTargetDataSource(final DataSource dataSource, final ImporterConfiguration importerConfig) {
Collection<DataSource> dataSources = Collections.singleton(dataSource);
checkConnection(dataSources);
checkTargetTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
}

/**
* Check data source connections.
*
* @param dataSources data sources
* @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception
*/
public void checkConnection(final Collection<? extends DataSource> dataSources) {
public void checkConnection(final Collection<DataSource> dataSources) {
try {
for (DataSource each : dataSources) {
each.getConnection().close();
Expand All @@ -88,22 +64,38 @@ public void checkConnection(final Collection<? extends DataSource> dataSources)
}

/**
* Check table is empty.
* Check source data source.
*
* @param dataSources to be checked source data source
*/
public void checkSourceDataSource(final Collection<DataSource> dataSources) {
checkConnection(dataSources);
if (null == checker) {
return;
}
dataSources.forEach(checker::checkPrivilege);
dataSources.forEach(checker::checkVariable);
}

/**
* Check target data sources.
*
* @param dataSources data sources
* @param tableAndSchemaNameMapper mapping
* @param logicTableNames logic table names
* @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception
* @param dataSources to be checked target data sources
* @param importerConfig importer configuration
*/
public void checkTargetDataSources(final Collection<DataSource> dataSources, final ImporterConfiguration importerConfig) {
checkConnection(dataSources);
checkTargetTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
}

// TODO rename to common usage name
// TODO Merge schemaName and tableNames
public void checkTargetTable(final Collection<? extends DataSource> dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection<String> logicTableNames) {
private void checkTargetTable(final Collection<DataSource> dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection<String> logicTableNames) {
try {
for (DataSource each : dataSources) {
for (String tableName : logicTableNames) {
if (!checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
throw new PrepareJobWithTargetTableNotEmptyException(tableName);
}
ShardingSpherePreconditions.checkState(checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
() -> new PrepareJobWithTargetTableNotEmptyException(tableName));
}
}
} catch (final SQLException ex) {
Expand All @@ -120,32 +112,4 @@ private boolean checkEmpty(final DataSource dataSource, final String schemaName,
return !resultSet.next();
}
}

/**
* Check user privileges.
*
* @param dataSources data sources
*/
public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
if (null == checker) {
return;
}
for (DataSource each : dataSources) {
checker.checkPrivilege(each);
}
}

/**
* Check data source variables.
*
* @param dataSources data sources
*/
public void checkVariable(final Collection<? extends DataSource> dataSources) {
if (null == checker) {
return;
}
for (DataSource each : dataSources) {
checker.checkVariable(each);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
Expand All @@ -39,6 +40,7 @@
import java.util.LinkedList;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -82,20 +84,25 @@ void assertCheckConnectionFailed() throws SQLException {
}

@Test
void assertCheckTargetTable() throws SQLException {
void assertCheckTargetDataSources() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order"));
ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new TableAndSchemaNameMapper(Collections.emptyMap()));
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig);
}

@Test
void assertCheckTargetTableFailed() throws SQLException {
void assertCheckTargetDataSourcesFailed() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
() -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order")));
ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new TableAndSchemaNameMapper(Collections.emptyMap()));
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
Expand All @@ -101,7 +102,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
Expand Down Expand Up @@ -163,7 +164,7 @@ private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext)
}
if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource, jobItemContext.getTaskConfig().getImporterConfig());
new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource), jobItemContext.getTaskConfig().getImporterConfig());
}
}

Expand Down

0 comments on commit 41eaef7

Please sign in to comment.