Skip to content

Commit

Permalink
fix validator and converter used in multi thread (#30683)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuichenchuxin authored Mar 28, 2024
1 parent 9385008 commit 1c0b753
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import lombok.Getter;
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.datanode.DataNode;
Expand Down Expand Up @@ -51,10 +58,11 @@
import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationCompilerEngine;
import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan;
import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerPlannerContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData;
import org.apache.shardingsphere.sqlfederation.optimizer.exception.syntax.SQLFederationUnsupportedSQLException;
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationTable;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.cache.ExecutionPlanCacheKey;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.util.SQLFederationPlannerUtils;
import org.apache.shardingsphere.sqlfederation.optimizer.statement.SQLStatementCompiler;
import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet;
import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
Expand All @@ -79,6 +87,8 @@ public final class SQLFederationEngine implements AutoCloseable {

private static final int DEFAULT_METADATA_VERSION = 0;

private static final JavaTypeFactory DEFAULT_DATA_TYPE_FACTORY = new JavaTypeFactoryImpl();

private final ProcessEngine processEngine = new ProcessEngine();

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -163,8 +173,16 @@ public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUn
try {
String databaseName = federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
String schemaName = federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, callback, federationContext, databaseName, schemaName);
resultSet = executePlan(federationContext, executionPlan, databaseName, schemaName);
OptimizerMetaData optimizerMetaData = sqlFederationRule.getOptimizerContext().getMetaData(databaseName);
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDialectProps());
CalciteCatalogReader catalogReader = SQLFederationPlannerUtils.createCatalogReader(schemaName, optimizerMetaData.getSchema(schemaName), DEFAULT_DATA_TYPE_FACTORY, connectionConfig);
SqlValidator validator = SQLFederationPlannerUtils.createSqlValidator(catalogReader, DEFAULT_DATA_TYPE_FACTORY,
sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), connectionConfig);
SqlToRelConverter converter = SQLFederationPlannerUtils.createSqlToRelConverter(catalogReader, validator, SQLFederationPlannerUtils.createRelOptCluster(DEFAULT_DATA_TYPE_FACTORY),
sqlFederationRule.getOptimizerContext().getSqlParserRule(), sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), true);
Schema sqlFederationSchema = catalogReader.getRootSchema().plus().getSubSchema(schemaName);
SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, callback, federationContext, databaseName, schemaName, sqlFederationSchema, converter);
resultSet = executePlan(federationContext, executionPlan, validator, converter, databaseName, schemaName, sqlFederationSchema);
return resultSet;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
Expand All @@ -173,30 +191,26 @@ public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUn
}
}

private SQLFederationExecutionPlan compileQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext, final String databaseName,
final String schemaName) {
private SQLFederationExecutionPlan compileQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback,
final SQLFederationContext federationContext, final String databaseName, final String schemaName, final Schema sqlFederationSchema,
final SqlToRelConverter converter) {
SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext();
ShardingSpherePreconditions.checkState(sqlStatementContext instanceof SelectStatementContext, () -> new IllegalArgumentException("SQL statement context must be select statement context."));
OptimizerPlannerContext plannerContext = sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
Schema sqlFederationSchema = plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
registerTableScanExecutor(sqlFederationSchema, prepareEngine, callback, federationContext, sqlFederationRule.getOptimizerContext(), databaseName, schemaName);
SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(plannerContext.getConverter(schemaName));
SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter);
SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache());
SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
// TODO open useCache flag when ShardingSphereTable contains version
return compilerEngine.compile(buildCacheKey(federationContext, selectStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
return compilerEngine.compile(buildCacheKey(federationContext, (SelectStatementContext) sqlStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
}

@SuppressWarnings("unchecked")
private ResultSet executePlan(final SQLFederationContext federationContext, final SQLFederationExecutionPlan executionPlan, final String databaseName, final String schemaName) {
private ResultSet executePlan(final SQLFederationContext federationContext, final SQLFederationExecutionPlan executionPlan, final SqlValidator validator, final SqlToRelConverter converter,
final String databaseName, final String schemaName, final Schema sqlFederationSchema) {
try {
Bindable<Object> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
Map<String, Object> params = createParameters(federationContext.getQueryContext().getParameters());
OptimizerPlannerContext plannerContext = sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(plannerContext.getValidator(schemaName), plannerContext.getConverter(schemaName), params)).enumerator();
Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(validator, converter, params)).enumerator();
ShardingSphereSchema schema = federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
Schema sqlFederationSchema = plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
return new SQLFederationResultSet(enumerator, schema, sqlFederationSchema, (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(),
executionPlan.getResultColumnType());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sqlfederation.optimizer.context.parser.OptimizerParserContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerPlannerContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData;

import java.util.Map;

Expand All @@ -36,7 +36,7 @@ public final class OptimizerContext {

private final Map<String, OptimizerParserContext> parserContexts;

private final Map<String, OptimizerPlannerContext> plannerContexts;
private final Map<String, OptimizerMetaData> optimizerMetaData;

/**
* Get parser context.
Expand All @@ -49,12 +49,12 @@ public OptimizerParserContext getParserContext(final String databaseName) {
}

/**
* Get planner context.
* Get meta data.
*
* @param databaseName database name
* @return Planner
* @return optimizer meta data
*/
public OptimizerPlannerContext getPlannerContext(final String databaseName) {
return plannerContexts.get(databaseName);
public OptimizerMetaData getMetaData(final String databaseName) {
return optimizerMetaData.get(databaseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.shardingsphere.parser.rule.builder.SQLParserRuleBuilder;
import org.apache.shardingsphere.sqlfederation.optimizer.context.parser.OptimizerParserContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.parser.OptimizerParserContextFactory;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerPlannerContext;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerPlannerContextFactory;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData;
import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaDataFactory;

import java.util.Map;
import java.util.Properties;
Expand All @@ -48,7 +48,7 @@ public static OptimizerContext create(final Map<String, ShardingSphereDatabase>
Map<String, OptimizerParserContext> parserContexts = OptimizerParserContextFactory.create(databases);
// TODO consider to use sqlParserRule in global rule
SQLParserRule sqlParserRule = new SQLParserRuleBuilder().build(new DefaultSQLParserRuleConfigurationBuilder().build(), databases, new ConfigurationProperties(new Properties()));
Map<String, OptimizerPlannerContext> plannerContexts = OptimizerPlannerContextFactory.create(databases, parserContexts, sqlParserRule);
return new OptimizerContext(sqlParserRule, parserContexts, plannerContexts);
Map<String, OptimizerMetaData> optimizerMetaData = OptimizerMetaDataFactory.create(databases);
return new OptimizerContext(sqlParserRule, parserContexts, optimizerMetaData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,25 @@
package org.apache.shardingsphere.sqlfederation.optimizer.context.planner;

import lombok.RequiredArgsConstructor;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.schema.Schema;

import java.util.Map;

/**
* Optimize planner context.
* Optimize meta data.
*/
@RequiredArgsConstructor
public final class OptimizerPlannerContext {
public final class OptimizerMetaData {

private final Map<String, SqlValidator> validators;

private final Map<String, SqlToRelConverter> converters;

/**
* Get validator.
*
* @param schemaName schema name
* @return validator
*/
public SqlValidator getValidator(final String schemaName) {
return validators.get(schemaName);
}
private final Map<String, Schema> schemas;

/**
* Get converter.
* Get schema.
*
* @param schemaName schema name
* @return converter
* @return schema
*/
public SqlToRelConverter getConverter(final String schemaName) {
return converters.get(schemaName);
public Schema getSchema(final String schemaName) {
return schemas.get(schemaName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.optimizer.context.planner;

import com.cedarsoftware.util.CaseInsensitiveMap;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.schema.Schema;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationSchema;

import java.util.Map;
import java.util.Map.Entry;

/**
* Optimizer meta data factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class OptimizerMetaDataFactory {

private static final JavaTypeFactory DEFAULT_DATA_TYPE_FACTORY = new JavaTypeFactoryImpl();

/**
* Create optimizer meta data map.
*
* @param databases databases
* @return created optimizer planner context map
*/
public static Map<String, OptimizerMetaData> create(final Map<String, ShardingSphereDatabase> databases) {
Map<String, OptimizerMetaData> result = new CaseInsensitiveMap<>(databases.size(), 1F);
for (Entry<String, ShardingSphereDatabase> entry : databases.entrySet()) {
result.put(entry.getKey(), create(entry.getValue()));
}
return result;
}

/**
* Create optimizer meta data.
*
* @param database database
* @return created optimizer planner context
*/
public static OptimizerMetaData create(final ShardingSphereDatabase database) {
Map<String, Schema> schemas = new CaseInsensitiveMap<>();
for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
Schema sqlFederationSchema = new SQLFederationSchema(entry.getKey(), entry.getValue(), database.getProtocolType(), DEFAULT_DATA_TYPE_FACTORY);
schemas.put(entry.getKey(), sqlFederationSchema);
}
return new OptimizerMetaData(schemas);
}
}
Loading

0 comments on commit 1c0b753

Please sign in to comment.