Skip to content

Commit

Permalink
Refactor StorageUnitMetaData to group by storage unit name (#28621)
Browse files Browse the repository at this point in the history
* Remove StorageUnitMetaData.storageNodes

* Remove StorageUnitMetaData.dataSourcePoolPropertiesMap

* Remove StorageUnitMetaData.dataSourceMap

* Remove StorageUnitMetaData.storageUnits

* Fix CI

* Code format

* Try to fix IT

* Try to fix IT
  • Loading branch information
terrymanu authored Oct 1, 2023
1 parent d8f5320 commit 4ee35fb
Show file tree
Hide file tree
Showing 98 changed files with 571 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Optional<GaugeMetricFamilyMetricsCollector> export(final String pluginTyp
ShardingSphereDataSource dataSource = (ShardingSphereDataSource) entry.getValue();
String databaseName = AgentReflectionUtils.getFieldValue(dataSource, "databaseName");
ContextManager contextManager = AgentReflectionUtils.getFieldValue(dataSource, "contextManager");
result.addMetric(Arrays.asList(databaseName, "storage_unit_count"), contextManager.getStorageUnits(databaseName).size());
result.addMetric(Arrays.asList(databaseName, "storage_unit_count"), contextManager.getStorageUnitMetaDataMap(databaseName).size());
}
return Optional.of(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public Optional<GaugeMetricFamilyMetricsCollector> export(final String pluginTyp
}

private int getStorageUnitCount(final MetaDataContexts metaDataContexts) {
return metaDataContexts.getMetaData().getDatabases().values().stream().map(each -> each.getResourceMetaData().getStorageUnitMetaData().getStorageUnits().size()).reduce(0, Integer::sum);
return metaDataContexts.getMetaData().getDatabases().values().stream().map(each -> each.getResourceMetaData().getStorageUnitMetaData().getMetaDataMap().size()).reduce(0, Integer::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand Down Expand Up @@ -75,7 +75,7 @@ void assertExportWithContextManager() {

private ContextManager mockContextManager() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
when(database.getResourceMetaData().getStorageUnitMetaData().getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
when(database.getResourceMetaData().getStorageUnitMetaData().getMetaDataMap()).thenReturn(Collections.singletonMap("ds_0", mock(NewStorageUnitMetaData.class)));
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
when(metaData.getDatabases()).thenReturn(Collections.singletonMap("sharding_db", database));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void checkToBeCreatedEncryptors(final CreateEncryptRuleStatement sqlStat
}

private void checkDataSources(final ShardingSphereDatabase database) {
ShardingSpherePreconditions.checkState(!database.getResourceMetaData().getStorageUnitMetaData().getStorageUnits().isEmpty(), () -> new EmptyStorageUnitException(database.getName()));
ShardingSpherePreconditions.checkState(!database.getResourceMetaData().getStorageUnitMetaData().getMetaDataMap().isEmpty(), () -> new EmptyStorageUnitException(database.getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ private static Collection<String> getDuplicated(final Collection<String> require
private static void checkDuplicateRuleNamesWithExistsDataSources(final ShardingSphereDatabase database, final Collection<ReadwriteSplittingRuleSegment> segments) {
Collection<String> currentRuleNames = new HashSet<>();
ResourceMetaData resourceMetaData = database.getResourceMetaData();
if (null != resourceMetaData && null != resourceMetaData.getStorageUnitMetaData().getStorageUnits()) {
currentRuleNames.addAll(resourceMetaData.getStorageUnitMetaData().getStorageUnits().keySet());
if (null != resourceMetaData && null != resourceMetaData.getStorageUnitMetaData().getMetaDataMap()) {
currentRuleNames.addAll(resourceMetaData.getStorageUnitMetaData().getMetaDataMap().keySet());
}
currentRuleNames.addAll(getLogicDataSources(database));
Collection<String> toBeCreatedRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).filter(currentRuleNames::contains).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ void before() {

@Test
void assertCheckSQLStatementWithDuplicateRuleNames() {
when(resourceMetaData.getStorageUnitMetaData().getStorageUnits()).thenReturn(Collections.emptyMap());
when(resourceMetaData.getStorageUnitMetaData().getMetaDataMap()).thenReturn(Collections.emptyMap());
assertThrows(DuplicateRuleException.class, () -> updater.checkSQLStatement(database, createSQLStatement("TEST"), createCurrentRuleConfiguration()));
}

@Test
void assertCheckSQLStatementWithDuplicateResource() {
when(resourceMetaData.getStorageUnitMetaData().getStorageUnits()).thenReturn(Collections.singletonMap("write_ds", null));
when(resourceMetaData.getStorageUnitMetaData().getMetaDataMap()).thenReturn(Collections.singletonMap("write_ds", null));
assertThrows(InvalidRuleConfigurationException.class, () -> updater.checkSQLStatement(database, createSQLStatement("write_ds", "TEST"), createCurrentRuleConfiguration()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public void check(final String databaseName, final ShadowRuleConfiguration confi
}

private void checkDataSources(final Map<String, ShadowDataSourceConfiguration> shadowDataSources, final Map<String, DataSource> dataSourceMap, final String databaseName) {
Set<String> dataSource = dataSourceMap.keySet();
Set<String> dataSourceName = dataSourceMap.keySet();
for (Entry<String, ShadowDataSourceConfiguration> entry : shadowDataSources.entrySet()) {
ShardingSpherePreconditions.checkState(dataSource.contains(entry.getValue().getProductionDataSourceName()),
ShardingSpherePreconditions.checkState(dataSourceName.contains(entry.getValue().getProductionDataSourceName()),
() -> new MissingRequiredShadowConfigurationException("ProductionDataSourceName", databaseName));
ShardingSpherePreconditions.checkState(dataSource.contains(entry.getValue().getShadowDataSourceName()),
ShardingSpherePreconditions.checkState(dataSourceName.contains(entry.getValue().getShadowDataSourceName()),
() -> new MissingRequiredShadowConfigurationException("ShadowDataSourceName", databaseName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.NewStorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
Expand Down Expand Up @@ -81,18 +81,18 @@ private void collectForShardingStatisticTable(final ShardingSphereDatabase datab
row.add(each.getLogicTable());
row.add(dataNode.getDataSourceName());
row.add(dataNode.getTableName());
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnitMetaData().getStorageUnits(), dataNode, row);
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnitMetaData().getMetaDataMap(), dataNode, row);
tableData.getRows().add(new ShardingSphereRowData(row));
}
}
}

private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUnits, final DataNode dataNode, final List<Object> row) throws SQLException {
DatabaseType databaseType = storageUnits.get(dataNode.getDataSourceName()).getStorageType();
private void addTableRowsAndDataLength(final Map<String, NewStorageUnitMetaData> metaDataMap, final DataNode dataNode, final List<Object> row) throws SQLException {
DatabaseType databaseType = metaDataMap.get(dataNode.getDataSourceName()).getStorageUnit().getStorageType();
Optional<DialectShardingStatisticsTableCollector> dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType);
boolean isAppended = false;
if (dialectCollector.isPresent()) {
try (Connection connection = storageUnits.get(dataNode.getDataSourceName()).getDataSource().getConnection()) {
try (Connection connection = metaDataMap.get(dataNode.getDataSourceName()).getDataSource().getConnection()) {
isAppended = dialectCollector.get().appendRow(connection, dataNode, row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;

import javax.sql.DataSource;
import java.sql.Connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private void closeResources(final ShardingSphereDatabase database) {
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
Optional.ofNullable(database.getResourceMetaData())
.ifPresent(optional -> optional.getStorageUnitMetaData().getStorageUnits().values().forEach(each -> new DataSourcePoolDestroyer(each.getDataSource()).asyncDestroy()));
.ifPresent(optional -> optional.getStorageUnitMetaData().getMetaDataMap().values().forEach(each -> new DataSourcePoolDestroyer(each.getDataSource()).asyncDestroy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.metadata.database.resource.StorageResource;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.StorageResource;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilder;
import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilderMaterial;
Expand All @@ -38,12 +38,15 @@
import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* ShardingSphere database.
Expand Down Expand Up @@ -172,7 +175,7 @@ public void dropSchema(final String schemaName) {
* @return is completed or not
*/
public boolean isComplete() {
return !ruleMetaData.getRules().isEmpty() && !resourceMetaData.getStorageUnitMetaData().getStorageUnits().isEmpty();
return !ruleMetaData.getRules().isEmpty() && !resourceMetaData.getStorageUnitMetaData().getMetaDataMap().isEmpty();
}

/**
Expand All @@ -181,7 +184,7 @@ public boolean isComplete() {
* @return contains data source or not
*/
public boolean containsDataSource() {
return !resourceMetaData.getStorageUnitMetaData().getStorageUnits().isEmpty();
return !resourceMetaData.getStorageUnitMetaData().getMetaDataMap().isEmpty();
}

/**
Expand All @@ -195,7 +198,9 @@ public synchronized void reloadRules(final Class<? extends ShardingSphereRule> r
Collection<ShardingSphereRule> databaseRules = new LinkedList<>(ruleMetaData.getRules());
toBeReloadedRules.stream().findFirst().ifPresent(optional -> {
databaseRules.removeAll(toBeReloadedRules);
databaseRules.add(((MutableDataNodeRule) optional).reloadRule(ruleConfig, name, resourceMetaData.getStorageUnitMetaData().getDataSources(), databaseRules));
Map<String, DataSource> dataSources = resourceMetaData.getStorageUnitMetaData().getMetaDataMap().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
databaseRules.add(((MutableDataNodeRule) optional).reloadRule(ruleConfig, name, dataSources, databaseRules));
});
ruleMetaData.getRules().clear();
ruleMetaData.getRules().addAll(databaseRules);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNode;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNodeName;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNodeUtils;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnitNodeMapUtils;

Expand Down Expand Up @@ -68,17 +67,17 @@ public ResourceMetaData(final String databaseName, final Map<StorageNodeName, Da
*/
public Collection<String> getAllInstanceDataSourceNames() {
Collection<String> result = new LinkedList<>();
for (Entry<String, StorageUnit> entry : storageUnitMetaData.getStorageUnits().entrySet()) {
if (!isExisted(entry.getKey(), result)) {
result.add(entry.getKey());
for (String each : storageUnitMetaData.getMetaDataMap().keySet()) {
if (!isExisted(each, result)) {
result.add(each);
}
}
return result;
}

private boolean isExisted(final String dataSourceName, final Collection<String> existedDataSourceNames) {
return existedDataSourceNames.stream().anyMatch(each -> storageUnitMetaData.getStorageUnits().get(dataSourceName).getConnectionProperties()
.isInSameDatabaseInstance(storageUnitMetaData.getStorageUnits().get(each).getConnectionProperties()));
return existedDataSourceNames.stream().anyMatch(each -> storageUnitMetaData.getMetaDataMap().get(dataSourceName).getStorageUnit().getConnectionProperties()
.isInSameDatabaseInstance(storageUnitMetaData.getMetaDataMap().get(each).getStorageUnit().getConnectionProperties()));
}

/**
Expand All @@ -88,7 +87,7 @@ private boolean isExisted(final String dataSourceName, final Collection<String>
* @return connection properties
*/
public ConnectionProperties getConnectionProperties(final String dataSourceName) {
return storageUnitMetaData.getStorageUnits().get(dataSourceName).getConnectionProperties();
return storageUnitMetaData.getMetaDataMap().get(dataSourceName).getStorageUnit().getConnectionProperties();
}

/**
Expand All @@ -98,7 +97,7 @@ public ConnectionProperties getConnectionProperties(final String dataSourceName)
* @return storage type
*/
public DatabaseType getStorageType(final String dataSourceName) {
return storageUnitMetaData.getStorageUnits().get(dataSourceName).getStorageType();
return storageUnitMetaData.getMetaDataMap().get(dataSourceName).getStorageUnit().getStorageType();
}

/**
Expand All @@ -108,6 +107,6 @@ public DatabaseType getStorageType(final String dataSourceName) {
* @return not existed resource names
*/
public Collection<String> getNotExistedDataSources(final Collection<String> resourceNames) {
return resourceNames.stream().filter(each -> !storageUnitMetaData.getStorageUnits().containsKey(each)).collect(Collectors.toSet());
return resourceNames.stream().filter(each -> !storageUnitMetaData.getMetaDataMap().containsKey(each)).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.metadata.database.resource.unit;

import lombok.Getter;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNode;

import javax.sql.DataSource;

/**
* Storage unit meta data.
*/
@Getter
public final class NewStorageUnitMetaData {

private final StorageNode storageNode;

private final DataSourcePoolProperties dataSourcePoolProperties;

private final DataSource dataSource;

private final StorageUnit storageUnit;

public NewStorageUnitMetaData(final String databaseName, final StorageNode storageNode, final DataSourcePoolProperties dataSourcePoolProperties, final DataSource dataSource) {
this.storageNode = storageNode;
this.dataSourcePoolProperties = dataSourcePoolProperties;
this.dataSource = dataSource;
storageUnit = new StorageUnit(databaseName, dataSource, dataSourcePoolProperties, storageNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ private ConnectionProperties createConnectionProperties(final boolean isDataSour
}
Map<String, Object> standardProps = DataSourcePoolPropertiesCreator.create(dataSource).getConnectionPropertySynonyms().getStandardProperties();
ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, storageType);
return parser.parse(standardProps.get("url").toString(), standardProps.get("username").toString(), storageNode.getCatalog());
return parser.parse(standardProps.get("url").toString(), standardProps.getOrDefault("username", "").toString(), storageNode.getCatalog());
}
}
Loading

0 comments on commit 4ee35fb

Please sign in to comment.