Skip to content

Commit

Permalink
Refactor broadcast sql router
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyingZC committed Nov 4, 2024
1 parent 3e087d0 commit f416eb4
Showing 1 changed file with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.shardingsphere.broadcast.constant.BroadcastOrder;
import org.apache.shardingsphere.broadcast.route.engine.BroadcastRouteEngineFactory;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastDatabaseBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.route.engine.type.broadcast.BroadcastInstanceBroadcastRouteEngine;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
Expand Down Expand Up @@ -66,9 +68,7 @@ public final class BroadcastSQLRouter implements EntranceSQLRouter<BroadcastRule
@Override
public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData,
final ShardingSphereDatabase database, final BroadcastRule rule, final ConfigurationProperties props) {
RouteContext result = new RouteContext();
BroadcastRouteEngineFactory.newInstance(rule, database, queryContext).route(result, rule);
return result;
return BroadcastRouteEngineFactory.newInstance(rule, database, queryContext).route(new RouteContext(), rule);
}

@Override
Expand All @@ -81,21 +81,21 @@ public void decorateRouteContext(final RouteContext routeContext, final QueryCon
} else if (sqlStatement instanceof DDLStatement) {
decorateRouteContextWhenDDLStatement(routeContext, queryContext, database, rule);
} else if (sqlStatement instanceof DALStatement && isResourceGroupStatement(sqlStatement)) {
routeToAllDatabaseInstances(routeContext, database, rule);
doInstanceBroadcastRoute(routeContext, database, rule);
} else if (sqlStatement instanceof DCLStatement && !isDCLForSingleTable(queryContext.getSqlStatementContext())) {
routeToAllDatabaseInstances(routeContext, database, rule);
doInstanceBroadcastRoute(routeContext, database, rule);
}
}

private void decorateRouteContextWhenTCLStatement(final RouteContext routeContext, final BroadcastRule rule) {
routeToAllDatabases(routeContext, rule);
doDatabaseBroadcastRoute(routeContext, rule);
}

private void decorateRouteContextWhenDDLStatement(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final BroadcastRule rule) {
SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
if (sqlStatementContext instanceof CursorAvailable) {
if (sqlStatementContext instanceof CloseStatementContext && ((CloseStatementContext) sqlStatementContext).getSqlStatement().isCloseAll()) {
routeToAllDatabases(routeContext, rule);
doDatabaseBroadcastRoute(routeContext, rule);
}
return;
}
Expand All @@ -106,17 +106,17 @@ private void decorateRouteContextWhenDDLStatement(final RouteContext routeContex
boolean functionStatement = sqlStatement instanceof CreateFunctionStatement || sqlStatement instanceof AlterFunctionStatement || sqlStatement instanceof DropFunctionStatement;
boolean procedureStatement = sqlStatement instanceof CreateProcedureStatement || sqlStatement instanceof AlterProcedureStatement || sqlStatement instanceof DropProcedureStatement;
if (functionStatement || procedureStatement) {
routeToAllDatabases(routeContext, rule);
doDatabaseBroadcastRoute(routeContext, rule);
return;
}
// TODO BEGIN extract db route logic to common database router, eg: DCL in instance route @duanzhengqiang
if (sqlStatement instanceof CreateTablespaceStatement || sqlStatement instanceof AlterTablespaceStatement || sqlStatement instanceof DropTablespaceStatement) {
routeToAllDatabaseInstances(routeContext, database, rule);
doInstanceBroadcastRoute(routeContext, database, rule);
}
// TODO END extract db route logic to common database router, eg: DCL in instance route
Collection<String> tableNames = sqlStatementContext instanceof TableAvailable ? getTableNames((TableAvailable) sqlStatementContext) : Collections.emptyList();
if (rule.isAllBroadcastTables(tableNames)) {
routeToAllDatabaseInstances(routeContext, database, rule);
doInstanceBroadcastRoute(routeContext, database, rule);
}
}

Expand Down Expand Up @@ -152,20 +152,14 @@ private boolean isDCLForSingleTable(final SQLStatementContext sqlStatementContex
return false;
}

private void routeToAllDatabases(final RouteContext routeContext, final BroadcastRule rule) {
private void doDatabaseBroadcastRoute(final RouteContext routeContext, final BroadcastRule rule) {
routeContext.getRouteUnits().clear();
for (String each : rule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
routeContext.getRouteUnits().addAll(new BroadcastDatabaseBroadcastRouteEngine().route(new RouteContext(), rule).getRouteUnits());
}

private void routeToAllDatabaseInstances(final RouteContext routeContext, final ShardingSphereDatabase database, final BroadcastRule rule) {
private void doInstanceBroadcastRoute(final RouteContext routeContext, final ShardingSphereDatabase database, final BroadcastRule rule) {
routeContext.getRouteUnits().clear();
for (String each : rule.getDataSourceNames()) {
if (database.getResourceMetaData().getAllInstanceDataSourceNames().contains(each)) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
}
routeContext.getRouteUnits().addAll(new BroadcastInstanceBroadcastRouteEngine(database.getResourceMetaData()).route(new RouteContext(), rule).getRouteUnits());
}

@Override
Expand Down

0 comments on commit f416eb4

Please sign in to comment.