Skip to content

Commit

Permalink
Refactor ContextManager
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 30, 2023
1 parent 1bab8bd commit 65f3a08
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Optional<GaugeMetricFamilyMetricsCollector> export(final String pluginTyp
ShardingSphereDataSource dataSource = (ShardingSphereDataSource) entry.getValue();
String databaseName = AgentReflectionUtils.getFieldValue(dataSource, "databaseName");
ContextManager contextManager = AgentReflectionUtils.getFieldValue(dataSource, "contextManager");
result.addMetric(Arrays.asList(databaseName, "storage_unit_count"), contextManager.getStorageUnits(databaseName).size());
result.addMetric(Arrays.asList(databaseName, "storage_unit_count"), contextManager.getStorageUnitMetaDataMap(databaseName).size());
}
return Optional.of(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
Expand Down Expand Up @@ -89,7 +89,7 @@ public final class DriverDatabaseConnectionManager implements DatabaseConnection
private final String databaseName;

public DriverDatabaseConnectionManager(final String databaseName, final ContextManager contextManager) {
for (Entry<String, StorageUnit> entry : contextManager.getStorageUnits(databaseName).entrySet()) {
for (Entry<String, NewStorageUnitMetaData> entry : contextManager.getStorageUnitMetaDataMap(databaseName).entrySet()) {
DataSource dataSource = entry.getValue().getDataSource();
String cacheKey = getKey(databaseName, entry.getKey());
dataSourceMap.put(cacheKey, dataSource);
Expand Down Expand Up @@ -341,7 +341,9 @@ public List<Connection> getConnections(final String dataSourceName, final int co
private List<Connection> getConnections(final String currentDatabaseName, final String dataSourceName, final int connectionOffset, final int connectionSize,
final ConnectionMode connectionMode) throws SQLException {
String cacheKey = getKey(currentDatabaseName, dataSourceName);
DataSource dataSource = databaseName.equals(currentDatabaseName) ? dataSourceMap.get(cacheKey) : contextManager.getStorageUnits(currentDatabaseName).get(dataSourceName).getDataSource();
DataSource dataSource = databaseName.equals(currentDatabaseName)
? dataSourceMap.get(cacheKey)
: contextManager.getStorageUnitMetaDataMap(currentDatabaseName).get(dataSourceName).getDataSource();
Preconditions.checkNotNull(dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataBuilder;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand Down Expand Up @@ -102,7 +102,7 @@ public Connection getConnection(final String username, final String password) {
@Override
public void close() throws SQLException {
contextManagerDestroyedCallback(databaseName);
for (StorageUnit each : contextManager.getStorageUnits(databaseName).values()) {
for (NewStorageUnitMetaData each : contextManager.getStorageUnitMetaDataMap(databaseName).values()) {
close(each.getDataSource());
}
contextManager.close();
Expand Down Expand Up @@ -133,13 +133,13 @@ private void contextManagerDestroyedCallback(final String databaseName) {

@Override
public int getLoginTimeout() throws SQLException {
Map<String, StorageUnit> storageUnits = contextManager.getStorageUnits(databaseName);
Map<String, NewStorageUnitMetaData> storageUnits = contextManager.getStorageUnitMetaDataMap(databaseName);
return storageUnits.isEmpty() ? 0 : storageUnits.values().iterator().next().getDataSource().getLoginTimeout();
}

@Override
public void setLoginTimeout(final int seconds) throws SQLException {
for (StorageUnit each : contextManager.getStorageUnits(databaseName).values()) {
for (NewStorageUnitMetaData each : contextManager.getStorageUnitMetaDataMap(databaseName).values()) {
each.getDataSource().setLoginTimeout(seconds);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
Expand Down Expand Up @@ -75,8 +76,8 @@ void setUp() throws SQLException {

private ContextManager mockContextManager() throws SQLException {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
Map<String, StorageUnit> storageUnits = mockStorageUnits();
when(result.getStorageUnits(DefaultDatabase.LOGIC_NAME)).thenReturn(storageUnits);
Map<String, NewStorageUnitMetaData> metaDataMap = mockStorageUnitMetaDataMap();
when(result.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME)).thenReturn(metaDataMap);
MetaDataPersistService persistService = mockMetaDataPersistService();
when(result.getMetaDataContexts().getPersistService()).thenReturn(persistService);
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(
Expand All @@ -88,16 +89,20 @@ private ContextManager mockContextManager() throws SQLException {
return result;
}

private Map<String, StorageUnit> mockStorageUnits() throws SQLException {
Map<String, StorageUnit> result = new HashMap<>(2, 1F);
StorageUnit validStorageUnit = mock(StorageUnit.class);
when(validStorageUnit.getDataSource()).thenReturn(new MockedDataSource());
result.put("ds", validStorageUnit);
StorageUnit invalidStorageUnit = mock(StorageUnit.class);
private Map<String, NewStorageUnitMetaData> mockStorageUnitMetaDataMap() throws SQLException {
Map<String, NewStorageUnitMetaData> result = new HashMap<>(2, 1F);
result.put("ds", mockStorageUnitMetaData(new MockedDataSource()));
DataSource invalidDataSource = mock(DataSource.class);
when(invalidDataSource.getConnection()).thenThrow(new SQLException());
when(invalidStorageUnit.getDataSource()).thenReturn(invalidDataSource);
result.put("invalid_ds", invalidStorageUnit);
result.put("invalid_ds", mockStorageUnitMetaData(invalidDataSource));
return result;
}

private NewStorageUnitMetaData mockStorageUnitMetaData(final DataSource dataSource) {
NewStorageUnitMetaData result = mock(NewStorageUnitMetaData.class, RETURNS_DEEP_STUBS);
when(result.getDataSource()).thenReturn(dataSource);
StorageUnit storageUnit = mock(StorageUnit.class);
when(result.getStorageUnit()).thenReturn(storageUnit);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
Expand All @@ -33,6 +34,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;

import javax.sql.DataSource;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
Expand Down Expand Up @@ -251,7 +253,11 @@ private ContextManager mockContextManager() {

private ContextManager mockContextManager(final StorageUnit storageUnit) {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(result.getStorageUnits(DefaultDatabase.LOGIC_NAME)).thenReturn(Collections.singletonMap("ds", storageUnit));
NewStorageUnitMetaData storageUnitMetaData = mock(NewStorageUnitMetaData.class);
when(storageUnitMetaData.getStorageUnit()).thenReturn(storageUnit);
DataSource dataSource = storageUnit.getDataSource();
when(storageUnitMetaData.getDataSource()).thenReturn(dataSource);
when(result.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME)).thenReturn(Collections.singletonMap("ds", storageUnitMetaData));
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Arrays.asList(mockTransactionRule(), mock(TrafficRule.class))));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void assertNewConstructorWithModeConfigurationOnly() throws Exception {
assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase(DefaultDatabase.LOGIC_NAME));
assertThat(contextManager.getClusterStateContext().getCurrentState(), is(ClusterState.OK));
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(InstanceState.OK));
assertTrue(contextManager.getStorageUnits(DefaultDatabase.LOGIC_NAME).isEmpty());
assertTrue(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).isEmpty());
}
}

Expand All @@ -72,8 +72,8 @@ void assertNewConstructorWithAllArguments() throws Exception {
assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase(DefaultDatabase.LOGIC_NAME));
assertThat(contextManager.getClusterStateContext().getCurrentState(), is(ClusterState.OK));
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(InstanceState.OK));
assertThat(contextManager.getStorageUnits(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(contextManager.getStorageUnits(DefaultDatabase.LOGIC_NAME).get("ds").getDataSource().getConnection().getMetaData().getURL(), is("jdbc:mock://127.0.0.1/foo_ds"));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).get("ds").getDataSource().getConnection().getMetaData().getURL(), is("jdbc:mock://127.0.0.1/foo_ds"));
}
}

Expand Down Expand Up @@ -106,15 +106,15 @@ private ShardingSphereDataSource createShardingSphereDataSource(final DataSource
@Test
void assertEmptyDataSourceMap() throws Exception {
try (ShardingSphereDataSource actual = new ShardingSphereDataSource(DefaultDatabase.LOGIC_NAME, null)) {
assertTrue(getContextManager(actual).getStorageUnits(DefaultDatabase.LOGIC_NAME).isEmpty());
assertTrue(getContextManager(actual).getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).isEmpty());
assertThat(actual.getLoginTimeout(), is(0));
}
}

@Test
void assertNotEmptyDataSourceMap() throws Exception {
try (ShardingSphereDataSource actual = createShardingSphereDataSource(createHikariDataSource())) {
assertThat(getContextManager(actual).getStorageUnits(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(getContextManager(actual).getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(actual.getLoginTimeout(), is(15));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.SchemaManager;
Expand All @@ -47,11 +48,8 @@

import java.sql.SQLException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* Context manager.
Expand Down Expand Up @@ -102,14 +100,13 @@ public synchronized void renewMetaDataContexts(final MetaDataContexts metaDataCo
}

/**
* Get storage units.
* Get storage unit meta data map.
*
* @param databaseName database name
* @return storage units
* @return storage unit meta data map
*/
public Map<String, StorageUnit> getStorageUnits(final String databaseName) {
return metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaData().getMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getStorageUnit(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
public Map<String, NewStorageUnitMetaData> getStorageUnitMetaDataMap(final String databaseName) {
return metaDataContexts.get().getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnitMetaData().getMetaDataMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void assertGetDataSourceMap() {
ShardingSphereDatabase database =
new ShardingSphereDatabase(DefaultDatabase.LOGIC_NAME, mock(DatabaseType.class), resourceMetaData, mock(RuleMetaData.class), Collections.emptyMap());
when(metaDataContexts.getMetaData().getDatabase(DefaultDatabase.LOGIC_NAME)).thenReturn(database);
assertThat(contextManager.getStorageUnits(DefaultDatabase.LOGIC_NAME).size(), is(1));
assertThat(contextManager.getStorageUnitMetaDataMap(DefaultDatabase.LOGIC_NAME).size(), is(1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.NoDatabaseSelectedException;
import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.updater.ConnectionSessionRequiredRALUpdater;
Expand All @@ -43,7 +43,7 @@ public final class RefreshTableMetaDataUpdater implements ConnectionSessionRequi
public void executeUpdate(final ConnectionSession connectionSession, final RefreshTableMetaDataStatement sqlStatement) {
String databaseName = getDatabaseName(connectionSession);
ContextManager contextManager = ProxyContext.getInstance().getContextManager();
checkStorageUnits(databaseName, contextManager.getStorageUnits(databaseName), sqlStatement);
checkStorageUnitMetaData(databaseName, contextManager.getStorageUnitMetaDataMap(databaseName), sqlStatement);
String schemaName = getSchemaName(databaseName, sqlStatement, connectionSession);
if (sqlStatement.getStorageUnitName().isPresent()) {
if (sqlStatement.getTableName().isPresent()) {
Expand All @@ -60,12 +60,12 @@ public void executeUpdate(final ConnectionSession connectionSession, final Refre
}
}

private void checkStorageUnits(final String databaseName, final Map<String, StorageUnit> storageUnits, final RefreshTableMetaDataStatement sqlStatement) {
ShardingSpherePreconditions.checkState(!storageUnits.isEmpty(), () -> new EmptyStorageUnitException(databaseName));
private void checkStorageUnitMetaData(final String databaseName, final Map<String, NewStorageUnitMetaData> storageUnitMetaDataMap, final RefreshTableMetaDataStatement sqlStatement) {
ShardingSpherePreconditions.checkState(!storageUnitMetaDataMap.isEmpty(), () -> new EmptyStorageUnitException(databaseName));
if (sqlStatement.getStorageUnitName().isPresent()) {
String storageUnitName = sqlStatement.getStorageUnitName().get();
ShardingSpherePreconditions.checkState(
storageUnits.containsKey(storageUnitName), () -> new MissingRequiredStorageUnitsException(databaseName, Collections.singleton(storageUnitName)));
storageUnitMetaDataMap.containsKey(storageUnitName), () -> new MissingRequiredStorageUnitsException(databaseName, Collections.singleton(storageUnitName)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void checkDuplicatedLogicalDataSourceNames(final String databaseName, fi
}

private Collection<String> getCurrentStorageUnitNames(final String databaseName) {
return ProxyContext.getInstance().getContextManager().getStorageUnits(databaseName).keySet();
return ProxyContext.getInstance().getContextManager().getStorageUnitMetaDataMap(databaseName).keySet();
}

private Collection<String> getLogicalDataSourceNames(final String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void assertUnknownDatabaseException() {
@Test
void assertEmptyResource() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(contextManager.getStorageUnits("foo_db")).thenReturn(Collections.emptyMap());
when(contextManager.getStorageUnitMetaDataMap("foo_db")).thenReturn(Collections.emptyMap());
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
when(ProxyContext.getInstance().databaseExists("foo_db")).thenReturn(true);
UpdatableRALBackendHandler<?> backendHandler = new UpdatableRALBackendHandler<>(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void assertExecuteWithDuplicateStorageUnitNamesInStatement() {
@Test
void assertExecuteWithDuplicateStorageUnitNamesWithResourceMetaData() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(contextManager.getStorageUnits("foo_db").keySet()).thenReturn(Collections.singleton("ds_0"));
when(contextManager.getStorageUnitMetaDataMap("foo_db").keySet()).thenReturn(Collections.singleton("ds_0"));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
assertThrows(DuplicateStorageUnitException.class, () -> handler.execute("foo_db", createRegisterStorageUnitStatement()));
}
Expand Down

0 comments on commit 65f3a08

Please sign in to comment.