Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ResourceMetaData #28640

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public abstract class TracingJDBCExecutorCallbackAdvice<T> implements InstanceMe
public final void beforeMethod(final TargetAdviceObject target, final Method method, final Object[] args, final String pluginType) {
JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
ResourceMetaData resourceMetaData = AgentReflectionUtils.getFieldValue(target, "resourceMetaData");
ConnectionProperties connectionProps = resourceMetaData.getConnectionProperties(executionUnit.getExecutionUnit().getDataSourceName());
DatabaseType storageType = resourceMetaData.getStorageType(executionUnit.getExecutionUnit().getDataSourceName());
ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(executionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
DatabaseType storageType = resourceMetaData.getStorageUnits().get(executionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
recordExecuteInfo(RootSpanContext.get(), target, executionUnit, (boolean) args[1], connectionProps, storageType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -95,9 +96,9 @@ private void prepare() {
when(connection.getMetaData()).thenReturn(databaseMetaData);
when(statement.getConnection()).thenReturn(connection);
executionUnit = new JDBCExecutionUnit(new ExecutionUnit(DATA_SOURCE_NAME, new SQLUnit(SQL, Collections.emptyList())), null, statement);
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageType(DATA_SOURCE_NAME)).thenReturn(TypedSPILoader.getService(DatabaseType.class, "MySQL"));
when(resourceMetaData.getConnectionProperties(DATA_SOURCE_NAME)).thenReturn(mock(ConnectionProperties.class));
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class, RETURNS_DEEP_STUBS);
when(resourceMetaData.getStorageUnits().get(DATA_SOURCE_NAME).getStorageType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "MySQL"));
when(resourceMetaData.getStorageUnits().get(DATA_SOURCE_NAME).getConnectionProperties()).thenReturn(mock(ConnectionProperties.class));
JDBCExecutorCallback jdbcExecutorCallback = new JDBCExecutorCallbackFixture(TypedSPILoader.getService(DatabaseType.class, "MySQL"), resourceMetaData, new MySQLSelectStatement(), true);
Plugins.getMemberAccessor().set(JDBCExecutorCallback.class.getDeclaredField("resourceMetaData"), jdbcExecutorCallback, resourceMetaData);
targetObject = (TargetAdviceObject) jdbcExecutorCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.shardingsphere.infra.metadata.database.resource;

import lombok.Getter;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.CatalogSwitchableDataSource;
import org.apache.shardingsphere.infra.datasource.pool.props.creator.DataSourcePoolPropertiesCreator;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNode;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNodeName;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNodeAggregator;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNodeName;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitNodeMapUtils;

Expand Down Expand Up @@ -91,28 +89,7 @@ public Collection<String> getAllInstanceDataSourceNames() {
}

private boolean isExisted(final String dataSourceName, final Collection<String> existedDataSourceNames) {
return existedDataSourceNames.stream().anyMatch(each -> storageUnits.get(dataSourceName).getConnectionProperties()
.isInSameDatabaseInstance(storageUnits.get(each).getConnectionProperties()));
}

/**
* Get connection properties.
*
* @param dataSourceName data source name
* @return connection properties
*/
public ConnectionProperties getConnectionProperties(final String dataSourceName) {
return storageUnits.get(dataSourceName).getConnectionProperties();
}

/**
* Get storage type.
*
* @param dataSourceName data source name
* @return storage type
*/
public DatabaseType getStorageType(final String dataSourceName) {
return storageUnits.get(dataSourceName).getStorageType();
return existedDataSourceNames.stream().anyMatch(each -> storageUnits.get(dataSourceName).getConnectionProperties().isInSameDatabaseInstance(storageUnits.get(each).getConnectionProperties()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public final Collection<T> execute(final Collection<JDBCExecutionUnit> execution
*/
private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread) throws SQLException {
SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DatabaseType storageType = resourceMetaData.getStorageType(jdbcExecutionUnit.getExecutionUnit().getDataSourceName());
ConnectionProperties connectionProps = resourceMetaData.getConnectionProperties(jdbcExecutionUnit.getExecutionUnit().getDataSourceName());
DatabaseType storageType = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setUp() {
void assertExecuteFailedAndProtocolTypeDifferentWithDatabaseType() throws SQLException {
Object saneResult = new Object();
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageType("ds")).thenReturn(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
when(resourceMetaData.getStorageUnits().get("ds").getStorageType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
JDBCExecutorCallback<Object> callback =
new JDBCExecutorCallback<Object>(TypedSPILoader.getService(DatabaseType.class, "MySQL"), resourceMetaData, mock(SelectStatement.class), true) {

Expand All @@ -87,7 +87,7 @@ protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final
@Test
void assertExecuteSQLExceptionOccurredAndProtocolTypeSameAsDatabaseType() {
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageType("ds")).thenReturn(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
when(resourceMetaData.getStorageUnits().get("ds").getStorageType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"));
JDBCExecutorCallback<Object> callback =
new JDBCExecutorCallback<Object>(TypedSPILoader.getService(DatabaseType.class, "MySQL"), resourceMetaData, mock(SelectStatement.class), true) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ private ResultSet createDatabaseMetaDataResultSet(final ResultSet resultSet) thr

private String getActualCatalog(final String catalog) {
ConnectionProperties connectionProps = connection.getContextManager()
.getMetaDataContexts().getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getConnectionProperties(getDataSourceName());
.getMetaDataContexts().getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageUnits().get(getDataSourceName()).getConnectionProperties();
return null == catalog || !catalog.contains(DefaultDatabase.LOGIC_NAME) ? catalog : connectionProps.getCatalog();
}

private String getActualSchema(final String schema) {
ConnectionProperties connectionProps = connection.getContextManager()
.getMetaDataContexts().getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getConnectionProperties(getDataSourceName());
.getMetaDataContexts().getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageUnits().get(getDataSourceName()).getConnectionProperties();
return null == schema || !schema.contains(DefaultDatabase.LOGIC_NAME) ? schema : connectionProps.getSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public SwitchingResource registerStorageUnit(final ResourceMetaData resourceMeta
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSourcePoolProperties(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
mergedPropsMap.putAll(storageUnitDataSourcePoolPropsMap);
Map<String, StorageNode> toBeCreatedStorageUintNodeMap = StorageUnitNodeMapUtils.fromDataSourcePoolProperties(storageUnitDataSourcePoolPropsMap);
return new SwitchingResource(resourceMetaData,
getRegisterNewStorageResource(resourceMetaData, toBeCreatedStorageUintNodeMap, StorageNodeAggregator.aggregateDataSourcePoolProperties(storageUnitDataSourcePoolPropsMap)),
Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap = StorageNodeAggregator.aggregateDataSourcePoolProperties(storageUnitDataSourcePoolPropsMap);
return new SwitchingResource(resourceMetaData, getRegisterNewStorageResource(resourceMetaData, toBeCreatedStorageUintNodeMap, dataSourcePoolPropsMap),
new StorageResource(Collections.emptyMap(), Collections.emptyMap()), mergedPropsMap);
}

Expand All @@ -77,13 +77,13 @@ private StorageResource getRegisterNewStorageResource(final ResourceMetaData res
* @return created switching resource
*/
public SwitchingResource alterStorageUnit(final ResourceMetaData resourceMetaData, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, DataSourcePoolProperties> mergedDataSourcePoolPropertiesMap = new LinkedHashMap<>(resourceMetaData.getStorageUnits().entrySet().stream()
Map<String, DataSourcePoolProperties> mergedDataSourcePoolPropsMap = new LinkedHashMap<>(resourceMetaData.getStorageUnits().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSourcePoolProperties(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
mergedDataSourcePoolPropertiesMap.putAll(propsMap);
Map<String, StorageNode> toBeAlteredStorageUintNodeMap = StorageUnitNodeMapUtils.fromDataSourcePoolProperties(mergedDataSourcePoolPropertiesMap);
return new SwitchingResource(resourceMetaData,
getAlterNewStorageResource(toBeAlteredStorageUintNodeMap, StorageNodeAggregator.aggregateDataSourcePoolProperties(mergedDataSourcePoolPropertiesMap)),
getStaleStorageResource(resourceMetaData, toBeAlteredStorageUintNodeMap), mergedDataSourcePoolPropertiesMap);
mergedDataSourcePoolPropsMap.putAll(propsMap);
Map<String, StorageNode> toBeAlteredStorageUintNodeMap = StorageUnitNodeMapUtils.fromDataSourcePoolProperties(mergedDataSourcePoolPropsMap);
Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap = StorageNodeAggregator.aggregateDataSourcePoolProperties(mergedDataSourcePoolPropsMap);
return new SwitchingResource(resourceMetaData, getAlterNewStorageResource(toBeAlteredStorageUintNodeMap, dataSourcePoolPropsMap),
getStaleStorageResource(resourceMetaData, toBeAlteredStorageUintNodeMap), mergedDataSourcePoolPropsMap);
}

private StorageResource getAlterNewStorageResource(final Map<String, StorageNode> storageUintNodeMap, final Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public SwitchingResource create(final ResourceMetaData resourceMetaData, final M
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSourcePoolProperties(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
mergedPropsMap.putAll(toBeChangedPropsMap);
Map<String, StorageNode> toBeChangedStorageUnitNodeMap = StorageUnitNodeMapUtils.fromDataSourcePoolProperties(toBeChangedPropsMap);
return new SwitchingResource(resourceMetaData,
createNewStorageResource(resourceMetaData, toBeChangedStorageUnitNodeMap, StorageNodeAggregator.aggregateDataSourcePoolProperties(toBeChangedPropsMap)),
getStaleDataSources(resourceMetaData, toBeChangedStorageUnitNodeMap, mergedPropsMap), mergedPropsMap);
Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap = StorageNodeAggregator.aggregateDataSourcePoolProperties(toBeChangedPropsMap);
StorageResource newStorageResource = createNewStorageResource(resourceMetaData, toBeChangedStorageUnitNodeMap, dataSourcePoolPropsMap);
StorageResource staleDataSources = getStaleDataSources(resourceMetaData, toBeChangedStorageUnitNodeMap, mergedPropsMap);
return new SwitchingResource(resourceMetaData, newStorageResource, staleDataSources, mergedPropsMap);
}

/**
Expand Down Expand Up @@ -92,16 +93,15 @@ public SwitchingResource createByAlterDataSourcePoolProperties(final ResourceMet
staleStorageResource.getDataSources().putAll(getToBeDeletedDataSources(resourceMetaData.getDataSources(), toBeChangedStorageNodeNames));
staleStorageResource.getStorageUnitNodeMap().putAll(
getToBeDeletedStorageUnitNodeMap(resourceMetaData.getStorageUnits(), toBeChangedStorageUnitNodeMap.keySet()));
return new SwitchingResource(resourceMetaData,
createNewStorageResource(resourceMetaData, toBeChangedStorageUnitNodeMap, StorageNodeAggregator.aggregateDataSourcePoolProperties(toBeChangedPropsMap)),
staleStorageResource, mergedDataSourcePoolPropertiesMap);
Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap = StorageNodeAggregator.aggregateDataSourcePoolProperties(toBeChangedPropsMap);
return new SwitchingResource(
resourceMetaData, createNewStorageResource(resourceMetaData, toBeChangedStorageUnitNodeMap, dataSourcePoolPropsMap), staleStorageResource, mergedDataSourcePoolPropertiesMap);
}

private StorageResource createNewStorageResource(final ResourceMetaData resourceMetaData,
final Map<String, StorageNode> toBeChangedStorageUnitNodeMap, final Map<StorageNodeName, DataSourcePoolProperties> dataSourcePoolPropsMap) {
Collection<StorageNodeName> toBeChangedStorageNodeName = toBeChangedStorageUnitNodeMap.values().stream().map(StorageNode::getName).collect(Collectors.toSet());
Map<StorageNodeName, DataSource> storageNodes =
getNewStorageNodes(resourceMetaData, toBeChangedStorageNodeName, dataSourcePoolPropsMap);
Map<StorageNodeName, DataSource> storageNodes = getNewStorageNodes(resourceMetaData, toBeChangedStorageNodeName, dataSourcePoolPropsMap);
Map<String, StorageNode> storageUnitNodeMap = getNewStorageUnitNodeMap(resourceMetaData, toBeChangedStorageUnitNodeMap);
return new StorageResource(storageNodes, storageUnitNodeMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private Map<String, Collection<ExportedStorageNode>> getAllStorageNodes(final Sh
private Map<String, Collection<ExportedStorageNode>> generateDatabaseExportStorageNodesData(final ShardingSphereDatabase database) {
Map<String, ExportedStorageNode> storageNodes = new LinkedHashMap<>();
for (Entry<String, StorageUnit> entry : database.getResourceMetaData().getStorageUnits().entrySet()) {
ConnectionProperties connectionProps = database.getResourceMetaData().getConnectionProperties(entry.getKey());
ConnectionProperties connectionProps = database.getResourceMetaData().getStorageUnits().get(entry.getKey()).getConnectionProperties();
String databaseInstanceIp = getDatabaseInstanceIp(connectionProps);
if (storageNodes.containsKey(databaseInstanceIp)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public Collection<LocalDataQueryResultRow> getRows(final ShardingSphereDatabase
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
for (Entry<String, DataSourcePoolProperties> entry : getDataSourcePoolPropertiesMap(database, sqlStatement).entrySet()) {
String key = entry.getKey();
ConnectionProperties connectionProps = resourceMetaData.getConnectionProperties(key);
ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(key).getConnectionProperties();
Map<String, Object> poolProps = entry.getValue().getPoolPropertySynonyms().getStandardProperties();
Map<String, Object> customProps = getCustomProps(entry.getValue().getCustomProperties().getProperties(), connectionProps.getQueryProperties());
result.add(new LocalDataQueryResultRow(key,
resourceMetaData.getStorageType(key).getType(),
resourceMetaData.getStorageUnits().get(key).getStorageType().getType(),
connectionProps.getHostname(),
connectionProps.getPort(),
connectionProps.getCatalog(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected Collection<String> getDatabaseNames(final ConnectionSession connection
protected void preProcess(final String databaseName, final Map<String, Object> rows, final Map<String, String> alias) {
ResourceMetaData resourceMetaData = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getResourceMetaData();
Collection<String> catalogs = resourceMetaData.getStorageUnits().keySet()
.stream().map(each -> resourceMetaData.getConnectionProperties(each).getCatalog()).collect(Collectors.toSet());
.stream().map(each -> resourceMetaData.getStorageUnits().get(each).getConnectionProperties().getCatalog()).collect(Collectors.toSet());
schemaNameAlias = alias.getOrDefault(SCHEMA_NAME, "");
String rowValue = rows.getOrDefault(schemaNameAlias, "").toString();
queryDatabase = !rowValue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private Collection<ShardingSphereRule> createGlobalRules() {
private Map<String, StorageUnit> createStorageUnits(final DatabaseConfiguration databaseConfig, final DatabaseType databaseType) {
Map<String, StorageUnit> result = new LinkedHashMap<>(databaseConfig.getDataSources().size(), 1F);
for (Entry<String, DataSource> entry : databaseConfig.getDataSources().entrySet()) {
StorageUnit storageUnit = mock(StorageUnit.class);
StorageUnit storageUnit = mock(StorageUnit.class, RETURNS_DEEP_STUBS);
when(storageUnit.getStorageType()).thenReturn(databaseType);
result.put(entry.getKey(), storageUnit);
}
Expand Down
Loading