Skip to content

Commit

Permalink
Merge branch 'apache:master' into testcontainers-java
Browse files Browse the repository at this point in the history
  • Loading branch information
linghengqian authored Nov 29, 2024
2 parents e7b5841 + 5e6e453 commit 0a161f0
Show file tree
Hide file tree
Showing 120 changed files with 1,107 additions and 1,509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public final class BroadcastOrder {
/**
* Broadcast order.
*/
public static final int ORDER = 5;
public static final int ORDER = 15;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,148 +19,32 @@

import org.apache.shardingsphere.broadcast.constant.BroadcastOrder;
import org.apache.shardingsphere.broadcast.route.engine.BroadcastRouteEngineFactory;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastDatabaseBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastInstanceBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.ddl.CloseStatementContext;
import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.route.type.DecorateSQLRouter;
import org.apache.shardingsphere.infra.route.type.EntranceSQLRouter;
import org.apache.shardingsphere.infra.route.type.TableSQLRouter;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dal.DALStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dcl.DCLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterFunctionStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterProcedureStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterTablespaceStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateFunctionStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateProcedureStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateTablespaceStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DDLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropFunctionStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropProcedureStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropTablespaceStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;
import org.apache.shardingsphere.sql.parser.statement.mysql.dal.MySQLCreateResourceGroupStatement;
import org.apache.shardingsphere.sql.parser.statement.mysql.dal.MySQLSetResourceGroupStatement;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;

/**
* Broadcast SQL router.
*/
@HighFrequencyInvocation
public final class BroadcastSQLRouter implements EntranceSQLRouter<BroadcastRule>, DecorateSQLRouter<BroadcastRule>, TableSQLRouter<BroadcastRule> {
public final class BroadcastSQLRouter implements EntranceSQLRouter<BroadcastRule>, TableSQLRouter<BroadcastRule> {

@Override
public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData,
final ShardingSphereDatabase database, final BroadcastRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
return BroadcastRouteEngineFactory.newInstance(rule, database, queryContext).route(new RouteContext(), rule);
}

@Override
public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext,
final ShardingSphereDatabase database, final BroadcastRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
if (sqlStatement instanceof TCLStatement) {
decorateRouteContextWhenTCLStatement(routeContext, rule);
} else if (sqlStatement instanceof DDLStatement) {
decorateRouteContextWhenDDLStatement(routeContext, queryContext, database, rule);
} else if (sqlStatement instanceof DALStatement && isResourceGroupStatement(sqlStatement)) {
doInstanceBroadcastRoute(routeContext, database, rule);
} else if (sqlStatement instanceof DCLStatement && !isDCLForSingleTable(queryContext.getSqlStatementContext())) {
doInstanceBroadcastRoute(routeContext, database, rule);
}
}

private void decorateRouteContextWhenTCLStatement(final RouteContext routeContext, final BroadcastRule rule) {
doDatabaseBroadcastRoute(routeContext, rule);
}

private void decorateRouteContextWhenDDLStatement(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final BroadcastRule rule) {
SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
if (sqlStatementContext instanceof CursorAvailable) {
if (sqlStatementContext instanceof CloseStatementContext && ((CloseStatementContext) sqlStatementContext).getSqlStatement().isCloseAll()) {
doDatabaseBroadcastRoute(routeContext, rule);
}
return;
}
if (sqlStatementContext instanceof IndexAvailable && !routeContext.getRouteUnits().isEmpty()) {
putAllBroadcastTables(routeContext, rule, sqlStatementContext);
}
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
boolean functionStatement = sqlStatement instanceof CreateFunctionStatement || sqlStatement instanceof AlterFunctionStatement || sqlStatement instanceof DropFunctionStatement;
boolean procedureStatement = sqlStatement instanceof CreateProcedureStatement || sqlStatement instanceof AlterProcedureStatement || sqlStatement instanceof DropProcedureStatement;
if (functionStatement || procedureStatement) {
doDatabaseBroadcastRoute(routeContext, rule);
return;
}
// TODO BEGIN extract db route logic to common database router, eg: DCL in instance route @duanzhengqiang
if (sqlStatement instanceof CreateTablespaceStatement || sqlStatement instanceof AlterTablespaceStatement || sqlStatement instanceof DropTablespaceStatement) {
doInstanceBroadcastRoute(routeContext, database, rule);
public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database,
final BroadcastRule rule, final Collection<String> tableNames, final ConfigurationProperties props) {
Collection<String> broadcastTableNames = rule.getBroadcastTableNames(tableNames);
if (broadcastTableNames.isEmpty()) {
return new RouteContext();
}
// TODO END extract db route logic to common database router, eg: DCL in instance route
Collection<String> tableNames = sqlStatementContext instanceof TableAvailable ? getTableNames((TableAvailable) sqlStatementContext) : Collections.emptyList();
if (rule.isAllBroadcastTables(tableNames)) {
doInstanceBroadcastRoute(routeContext, database, rule);
}
}

private Collection<String> getTableNames(final TableAvailable sqlStatementContext) {
Collection<SimpleTableSegment> tableSegments = sqlStatementContext.getTablesContext().getSimpleTables();
Collection<String> result = new LinkedHashSet<>(tableSegments.size());
for (SimpleTableSegment each : tableSegments) {
result.add(each.getTableName().getIdentifier().getValue());
}
return result;
}

private void putAllBroadcastTables(final RouteContext routeContext, final BroadcastRule rule, final SQLStatementContext sqlStatementContext) {
Collection<String> tableNames = sqlStatementContext instanceof TableAvailable ? ((TableAvailable) sqlStatementContext).getTablesContext().getTableNames() : Collections.emptyList();
for (String each : rule.filterBroadcastTableNames(tableNames)) {
for (RouteUnit routeUnit : routeContext.getRouteUnits()) {
routeUnit.getTableMappers().add(new RouteMapper(each, each));
}
}
}

private boolean isResourceGroupStatement(final SQLStatement sqlStatement) {
// TODO add dropResourceGroupStatement, alterResourceGroupStatement
return sqlStatement instanceof MySQLCreateResourceGroupStatement || sqlStatement instanceof MySQLSetResourceGroupStatement;
}

private boolean isDCLForSingleTable(final SQLStatementContext sqlStatementContext) {
if (sqlStatementContext instanceof TableAvailable) {
TableAvailable tableSegmentsAvailable = (TableAvailable) sqlStatementContext;
return 1 == tableSegmentsAvailable.getTablesContext().getSimpleTables().size()
&& !"*".equals(tableSegmentsAvailable.getTablesContext().getSimpleTables().iterator().next().getTableName().getIdentifier().getValue());
}
return false;
}

private void doDatabaseBroadcastRoute(final RouteContext routeContext, final BroadcastRule rule) {
routeContext.getRouteUnits().clear();
routeContext.getRouteUnits().addAll(new BroadcastDatabaseBroadcastRouteEngine().route(new RouteContext(), rule).getRouteUnits());
}

private void doInstanceBroadcastRoute(final RouteContext routeContext, final ShardingSphereDatabase database, final BroadcastRule rule) {
routeContext.getRouteUnits().clear();
routeContext.getRouteUnits().addAll(new BroadcastInstanceBroadcastRouteEngine(database.getResourceMetaData()).route(new RouteContext(), rule).getRouteUnits());
return BroadcastRouteEngineFactory.newInstance(queryContext, broadcastTableNames).route(rule);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,17 @@
import org.apache.shardingsphere.broadcast.route.engine.type.BroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastDatabaseBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastTableBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.ignore.BroadcastIgnoreRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.unicast.BroadcastUnicastRouteEngine;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.extractor.SQLStatementContextExtractor;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.ddl.CloseStatementContext;
import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dal.DALStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dcl.DCLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DDLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;

import java.util.Collection;

Expand All @@ -53,70 +46,42 @@ public final class BroadcastRouteEngineFactory {
/**
* Create new instance of broadcast routing engine.
*
* @param rule broadcast rule
* @param database database
* @param queryContext query context
* @param broadcastTableNames broadcast table names
* @return broadcast route engine
*/
public static BroadcastRouteEngine newInstance(final BroadcastRule rule, final ShardingSphereDatabase database, final QueryContext queryContext) {
public static BroadcastRouteEngine newInstance(final QueryContext queryContext, final Collection<String> broadcastTableNames) {
SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
if (sqlStatement instanceof TCLStatement) {
return new BroadcastDatabaseBroadcastRouteEngine();
}
if (sqlStatement instanceof DDLStatement) {
return sqlStatementContext instanceof CursorAvailable
? getCursorRouteEngine(rule, sqlStatementContext, queryContext.getConnectionContext())
: getDDLRouteEngine(rule, database, sqlStatementContext);
}
if (!(sqlStatementContext instanceof TableAvailable)) {
return new BroadcastIgnoreRouteEngine();
}
Collection<String> tableNames = ((TableAvailable) sqlStatementContext).getTablesContext().getTableNames();
if (tableNames.isEmpty()) {
return new BroadcastIgnoreRouteEngine();
return getDDLRouteEngine(queryContext, broadcastTableNames, sqlStatementContext);
}
if (sqlStatement instanceof DALStatement) {
return getDALRouteEngine(rule, tableNames);
return getDALRouteEngine(broadcastTableNames);
}
if (sqlStatement instanceof DCLStatement) {
return getDCLRouteEngine(rule, tableNames);
}
return getDMLRouteEngine(rule, sqlStatementContext, queryContext.getConnectionContext(), tableNames);
}

private static BroadcastRouteEngine getCursorRouteEngine(final BroadcastRule rule, final SQLStatementContext sqlStatementContext, final ConnectionContext connectionContext) {
if (sqlStatementContext instanceof CloseStatementContext && ((CloseStatementContext) sqlStatementContext).getSqlStatement().isCloseAll()) {
return new BroadcastDatabaseBroadcastRouteEngine();
return getDCLRouteEngine(broadcastTableNames);
}
if (sqlStatementContext instanceof TableAvailable) {
Collection<String> tableNames = ((TableAvailable) sqlStatementContext).getTablesContext().getTableNames();
return rule.isAllBroadcastTables(tableNames) ? new BroadcastUnicastRouteEngine(sqlStatementContext, tableNames, connectionContext) : new BroadcastIgnoreRouteEngine();
}
return new BroadcastIgnoreRouteEngine();
return getDMLRouteEngine(sqlStatementContext, queryContext.getConnectionContext(), broadcastTableNames);
}

private static BroadcastRouteEngine getDDLRouteEngine(final BroadcastRule rule, final ShardingSphereDatabase database, final SQLStatementContext sqlStatementContext) {
Collection<String> tableNames = SQLStatementContextExtractor.getTableNames(database, sqlStatementContext);
return rule.isAllBroadcastTables(tableNames) ? new BroadcastTableBroadcastRouteEngine(tableNames) : new BroadcastIgnoreRouteEngine();
private static BroadcastRouteEngine getDDLRouteEngine(final QueryContext queryContext, final Collection<String> broadcastTableNames, final SQLStatementContext sqlStatementContext) {
return sqlStatementContext instanceof CursorAvailable
? new BroadcastUnicastRouteEngine(sqlStatementContext, broadcastTableNames, queryContext.getConnectionContext())
: new BroadcastTableBroadcastRouteEngine(broadcastTableNames);
}

private static BroadcastRouteEngine getDALRouteEngine(final BroadcastRule rule, final Collection<String> tableNames) {
return new BroadcastTableBroadcastRouteEngine(rule.filterBroadcastTableNames(tableNames));
private static BroadcastRouteEngine getDALRouteEngine(final Collection<String> broadcastTableNames) {
return new BroadcastTableBroadcastRouteEngine(broadcastTableNames);
}

private static BroadcastRouteEngine getDCLRouteEngine(final BroadcastRule rule, final Collection<String> tableNames) {
Collection<String> broadcastTableNames = rule.filterBroadcastTableNames(tableNames);
return broadcastTableNames.isEmpty() ? new BroadcastIgnoreRouteEngine() : new BroadcastTableBroadcastRouteEngine(broadcastTableNames);
private static BroadcastRouteEngine getDCLRouteEngine(final Collection<String> broadcastTableNames) {
return new BroadcastTableBroadcastRouteEngine(broadcastTableNames);
}

private static BroadcastRouteEngine getDMLRouteEngine(final BroadcastRule rule, final SQLStatementContext sqlStatementContext,
final ConnectionContext connectionContext, final Collection<String> tableNames) {
if (rule.isAllBroadcastTables(tableNames)) {
return sqlStatementContext.getSqlStatement() instanceof SelectStatement
? new BroadcastUnicastRouteEngine(sqlStatementContext, tableNames, connectionContext)
: new BroadcastDatabaseBroadcastRouteEngine();
}
return new BroadcastIgnoreRouteEngine();
private static BroadcastRouteEngine getDMLRouteEngine(final SQLStatementContext sqlStatementContext, final ConnectionContext connectionContext, final Collection<String> broadcastTableNames) {
return sqlStatementContext.getSqlStatement() instanceof SelectStatement
? new BroadcastUnicastRouteEngine(sqlStatementContext, broadcastTableNames, connectionContext)
: new BroadcastDatabaseBroadcastRouteEngine();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public interface BroadcastRouteEngine {
/**
* Route.
*
* @param routeContext route context
* @param broadcastRule broadcast rule
* @return route context
*/
RouteContext route(RouteContext routeContext, BroadcastRule broadcastRule);
RouteContext route(BroadcastRule broadcastRule);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
public final class BroadcastDatabaseBroadcastRouteEngine implements BroadcastRouteEngine {

@Override
public RouteContext route(final RouteContext routeContext, final BroadcastRule rule) {
public RouteContext route(final BroadcastRule rule) {
RouteContext result = new RouteContext();
for (String each : rule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
result.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
return routeContext;
return result;
}
}
Loading

0 comments on commit 0a161f0

Please sign in to comment.