diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java index e0735b75a0c94..2165b336f63d7 100644 --- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java +++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.sharding.metadata.data; +import org.apache.shardingsphere.infra.database.DatabaseTypeEngine; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -29,10 +30,12 @@ import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData; import org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector; +import org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute; import org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector; import org.apache.shardingsphere.sharding.rule.ShardingRule; import org.apache.shardingsphere.sharding.rule.ShardingTable; +import javax.sql.DataSource; import java.math.BigDecimal; import java.sql.Connection; import java.sql.SQLException; @@ -81,19 +84,34 @@ private void collectForShardingStatisticTable(final ShardingSphereDatabase datab row.add(each.getLogicTable()); row.add(dataNode.getDataSourceName()); row.add(dataNode.getTableName()); - addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, row); + addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, row, rule); tableData.getRows().add(new ShardingSphereRowData(row)); } } } - private void addTableRowsAndDataLength(final Map storageUnits, final DataNode dataNode, final List row) throws SQLException { + private void addTableRowsAndDataLength(final Map storageUnits, final DataNode dataNode, final List row, final ShardingRule rule) throws SQLException { + DataSource dataSource; + DatabaseType databaseType; StorageUnit storageUnit = storageUnits.get(dataNode.getDataSourceName()); - DatabaseType databaseType = storageUnit.getStorageType(); - Optional dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType); + if (null != storageUnit) { + dataSource = storageUnit.getDataSource(); + databaseType = storageUnit.getStorageType(); + } else { + Optional aggregatedDataSourceRuleAttribute = rule.getAttributes().findAttribute(AggregatedDataSourceRuleAttribute.class); + dataSource = aggregatedDataSourceRuleAttribute.map(optional -> optional.getAggregatedDataSources().get(dataNode.getDataSourceName())).orElse(null); + databaseType = null != dataSource ? DatabaseTypeEngine.getStorageType(dataSource) : null; + } + if (null != dataSource && null != databaseType) { + addTableRowsAndDataLength(databaseType, dataSource, dataNode, row); + } + } + + private void addTableRowsAndDataLength(final DatabaseType databaseType, final DataSource dataSource, final DataNode dataNode, final List row) throws SQLException { boolean isAppended = false; + Optional dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType); if (dialectCollector.isPresent()) { - try (Connection connection = storageUnit.getDataSource().getConnection()) { + try (Connection connection = dataSource.getConnection()) { isAppended = dialectCollector.get().appendRow(connection, dataNode, row); } }