Skip to content

Commit

Permalink
Refactor StorageUnitMetaData.dataSource (#28632)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Oct 4, 2023
1 parent 166a16e commit eb3c984
Show file tree
Hide file tree
Showing 31 changed files with 51 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void addTableRowsAndDataLength(final Map<String, StorageUnitMetaData> me
Optional<DialectShardingStatisticsTableCollector> dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType);
boolean isAppended = false;
if (dialectCollector.isPresent()) {
try (Connection connection = metaDataMap.get(dataNode.getDataSourceName()).getDataSource().getConnection()) {
try (Connection connection = metaDataMap.get(dataNode.getDataSourceName()).getStorageUnit().getDataSource().getConnection()) {
isAppended = dialectCollector.get().appendRow(connection, dataNode, row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private void closeResources(final ShardingSphereDatabase database) {
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
Optional.ofNullable(database.getResourceMetaData())
.ifPresent(optional -> optional.getStorageUnitMetaDataMap().values().forEach(each -> new DataSourcePoolDestroyer(each.getDataSource()).asyncDestroy()));
.ifPresent(optional -> optional.getStorageUnitMetaDataMap().values().forEach(each -> new DataSourcePoolDestroyer(each.getStorageUnit().getDataSource()).asyncDestroy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public synchronized void reloadRules(final Class<? extends ShardingSphereRule> r
toBeReloadedRules.stream().findFirst().ifPresent(optional -> {
databaseRules.removeAll(toBeReloadedRules);
Map<String, DataSource> dataSources = resourceMetaData.getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
databaseRules.add(((MutableDataNodeRule) optional).reloadRule(ruleConfig, name, dataSources, databaseRules));
});
ruleMetaData.getRules().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@ public final class StorageUnitMetaData {

private final DataSourcePoolProperties dataSourcePoolProperties;

private final DataSource dataSource;

private final StorageUnit storageUnit;

public StorageUnitMetaData(final String databaseName, final StorageNode storageNode, final DataSourcePoolProperties dataSourcePoolProperties, final DataSource dataSource) {
this.storageNode = storageNode;
this.dataSourcePoolProperties = dataSourcePoolProperties;
this.dataSource = dataSource;
storageUnit = new StorageUnit(databaseName, dataSource, dataSourcePoolProperties, storageNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public GenericSchemaBuilderMaterial(final DatabaseType protocolType, final Map<S
this(protocolType, storageUnitMetaDataMap.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getStorageType(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
storageUnitMetaDataMap.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
rules, props, defaultSchemaName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static Collection<ShardingSphereRowData> collectRowData(final ShardingSph
Collection<ShardingSphereRowData> result = new LinkedList<>();
for (StorageUnitMetaData each : database.getResourceMetaData().getStorageUnitMetaDataMap().values()) {
try (
Connection connection = each.getDataSource().getConnection();
Connection connection = each.getStorageUnit().getDataSource().getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
result.addAll(getRows(table, selectedColumnNames, resultSet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final class DriverDatabaseConnectionManager implements DatabaseConnection

public DriverDatabaseConnectionManager(final String databaseName, final ContextManager contextManager) {
for (Entry<String, StorageUnitMetaData> entry : contextManager.getStorageUnitMetaDataMap(databaseName).entrySet()) {
DataSource dataSource = entry.getValue().getDataSource();
DataSource dataSource = entry.getValue().getStorageUnit().getDataSource();
String cacheKey = getKey(databaseName, entry.getKey());
dataSourceMap.put(cacheKey, dataSource);
physicalDataSourceMap.put(cacheKey, dataSource);
Expand Down Expand Up @@ -343,7 +343,7 @@ private List<Connection> getConnections(final String currentDatabaseName, final
String cacheKey = getKey(currentDatabaseName, dataSourceName);
DataSource dataSource = databaseName.equals(currentDatabaseName)
? dataSourceMap.get(cacheKey)
: contextManager.getStorageUnitMetaDataMap(currentDatabaseName).get(dataSourceName).getDataSource();
: contextManager.getStorageUnitMetaDataMap(currentDatabaseName).get(dataSourceName).getStorageUnit().getDataSource();
Preconditions.checkNotNull(dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public Connection getConnection(final String username, final String password) {
public void close() throws SQLException {
contextManagerDestroyedCallback(databaseName);
for (StorageUnitMetaData each : contextManager.getStorageUnitMetaDataMap(databaseName).values()) {
close(each.getDataSource());
close(each.getStorageUnit().getDataSource());
}
contextManager.close();
}
Expand Down Expand Up @@ -134,13 +134,13 @@ private void contextManagerDestroyedCallback(final String databaseName) {
@Override
public int getLoginTimeout() throws SQLException {
Map<String, StorageUnitMetaData> metaDataMap = contextManager.getStorageUnitMetaDataMap(databaseName);
return metaDataMap.isEmpty() ? 0 : metaDataMap.values().iterator().next().getDataSource().getLoginTimeout();
return metaDataMap.isEmpty() ? 0 : metaDataMap.values().iterator().next().getStorageUnit().getDataSource().getLoginTimeout();
}

@Override
public void setLoginTimeout(final int seconds) throws SQLException {
for (StorageUnitMetaData each : contextManager.getStorageUnitMetaDataMap(databaseName).values()) {
each.getDataSource().setLoginTimeout(seconds);
each.getStorageUnit().getDataSource().setLoginTimeout(seconds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
Expand Down Expand Up @@ -100,9 +99,7 @@ private Map<String, StorageUnitMetaData> mockStorageUnitMetaDataMap() throws SQL

private StorageUnitMetaData mockStorageUnitMetaData(final DataSource dataSource) {
StorageUnitMetaData result = mock(StorageUnitMetaData.class, RETURNS_DEEP_STUBS);
when(result.getDataSource()).thenReturn(dataSource);
StorageUnit storageUnit = mock(StorageUnit.class);
when(result.getStorageUnit()).thenReturn(storageUnit);
when(result.getStorageUnit().getDataSource()).thenReturn(dataSource);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand All @@ -34,7 +34,6 @@
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;

import javax.sql.DataSource;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
Expand Down Expand Up @@ -255,8 +254,6 @@ private ContextManager mockContextManager(final StorageUnit storageUnit) {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
StorageUnitMetaData storageUnitMetaData = mock(StorageUnitMetaData.class);
when(storageUnitMetaData.getStorageUnit()).thenReturn(storageUnit);
DataSource dataSource = storageUnit.getDataSource();
when(storageUnitMetaData.getDataSource()).thenReturn(dataSource);
when(result.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME)).thenReturn(Collections.singletonMap("ds", storageUnitMetaData));
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Arrays.asList(mockTransactionRule(), mock(TrafficRule.class))));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ void assertNewConstructorWithAllArguments() throws Exception {
assertThat(contextManager.getClusterStateContext().getCurrentState(), is(ClusterState.OK));
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(InstanceState.OK));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).get("ds").getDataSource().getConnection().getMetaData().getURL(), is("jdbc:mock://127.0.0.1/foo_ds"));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).get("ds").getStorageUnit().getDataSource().getConnection().getMetaData().getURL(),
is("jdbc:mock://127.0.0.1/foo_ds"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private Map<String, Collection<DataNode>> getActualDataNodes(final ShardingSpher
ResourceMetaData resourceMetaData = database.getResourceMetaData();
Map<String, DataSource> aggregateDataSourceMap = SingleTableLoadUtils.getAggregatedDataSourceMap(
resourceMetaData.getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules());
Collection<String> excludedTables = SingleTableLoadUtils.getExcludedTables(database.getRuleMetaData().getRules());
return SingleTableDataNodeLoader.load(database.getName(), database.getProtocolType(), aggregateDataSourceMap, excludedTables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void checkActualTableExist(final ShardingSphereDatabase database, final
ResourceMetaData resourceMetaData = database.getResourceMetaData();
Map<String, DataSource> aggregateDataSourceMap = SingleTableLoadUtils.getAggregatedDataSourceMap(
resourceMetaData.getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules());
Map<String, Map<String, Collection<String>>> actualTableNodes = new LinkedHashMap<>();
for (String each : requiredDataSources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private synchronized ShardingSphereTransactionManagerEngine createTransactionMan
Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(databases.size(), 1F);
for (Entry<String, ShardingSphereDatabase> entry : databases.entrySet()) {
ShardingSphereDatabase database = entry.getValue();
database.getResourceMetaData().getStorageUnitMetaDataMap().forEach((key, value) -> dataSourceMap.put(database.getName() + "." + key, value.getDataSource()));
database.getResourceMetaData().getStorageUnitMetaDataMap().forEach((key, value) -> dataSourceMap.put(database.getName() + "." + key, value.getStorageUnit().getDataSource()));
database.getResourceMetaData().getStorageUnitMetaDataMap().forEach((key, value) -> databaseTypes.put(database.getName() + "." + key, value.getStorageUnit().getStorageType()));
}
if (dataSourceMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private ShardingSphereSchema loadSchema(final String databaseName, final String
database.reloadRules(MutableDataNodeRule.class);
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getStorageUnitMetaDataMap().get(dataSourceName).getStorageUnit().getStorageType()),
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getStorageUnitMetaDataMap().get(dataSourceName).getDataSource()),
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getStorageUnitMetaDataMap().get(dataSourceName).getStorageUnit().getDataSource()),
database.getRuleMetaData().getRules(), metaDataContexts.get().getMetaData().getProps(), schemaName);
ShardingSphereSchema result = GenericSchemaBuilder.build(material).get(schemaName);
result.getViews().putAll(metaDataContexts.get().getPersistService().getDatabaseMetaDataService().getViewMetaDataPersistService().load(database.getName(), schemaName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public synchronized void alterRuleConfiguration(final String databaseName, final
rules.removeIf(each -> each.getConfiguration().getClass().isAssignableFrom(ruleConfig.getClass()));
rules.addAll(DatabaseRulesBuilder.build(databaseName,
database.getResourceMetaData().getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules(), ruleConfig, instanceContext));
refreshMetadata(databaseName, database, rules);
} catch (final SQLException ex) {
Expand All @@ -198,7 +198,7 @@ public synchronized void dropRuleConfiguration(final String databaseName, final
if (isNotEmptyConfig(ruleConfig)) {
rules.addAll(DatabaseRulesBuilder.build(databaseName,
database.getResourceMetaData().getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules(), ruleConfig, instanceContext));
}
refreshMetadata(databaseName, database, rules);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private static void persistDatabaseConfigurations(final MetaDataContexts metadat
String databaseName = entry.getKey();
metadataContexts.getPersistService().persistConfigurations(entry.getKey(), entry.getValue(),
metadataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, each -> each.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, each -> each.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
metadataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private static void persistDatabaseConfigurations(final MetaDataContexts metadat
String databaseName = entry.getKey();
metadataContexts.getPersistService().persistConfigurations(entry.getKey(), entry.getValue(),
metadataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, each -> each.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
.collect(Collectors.toMap(Entry::getKey, each -> each.getValue().getStorageUnit().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
metadataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public List<Connection> getConnections(final String databaseName, final String d
public List<Connection> getConnections(final String databaseName, final String dataSourceName,
final int connectionSize, final ConnectionMode connectionMode, final TransactionType transactionType) throws SQLException {
DataSource dataSource = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
.getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaDataMap().get(dataSourceName).getDataSource();
.getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaDataMap().get(dataSourceName).getStorageUnit().getDataSource();
if (dataSourceName.contains(".")) {
String dataSourceStr = dataSourceName.split("\\.")[0];
if (GlobalDataSourceRegistry.getInstance().getCachedDataSources().containsKey(dataSourceStr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ protected void processMetaData(final String databaseName, final Consumer<ResultS
return;
}
try (
Connection connection = storageUnitMetaData.get().getDataSource().getConnection();
Connection connection = storageUnitMetaData.get().getStorageUnit().getDataSource().getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
for (int i = 0; i < parameters.size(); i++) {
preparedStatement.setObject(i + 1, parameters.get(i));
Expand Down
Loading

0 comments on commit eb3c984

Please sign in to comment.