Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sql federation enumerable modify rule #28762

Merged
merged 9 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Statistic;
Expand All @@ -48,13 +54,15 @@
import org.apache.shardingsphere.sqlfederation.optimizer.statistic.SQLFederationStatistic;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* SQL federation table.
*/
@RequiredArgsConstructor
public final class SQLFederationTable extends AbstractTable implements QueryableTable, TranslatableTable {
public final class SQLFederationTable extends AbstractTable implements ModifiableTable, TranslatableTable {

private final ShardingSphereTable table;

Expand Down Expand Up @@ -124,4 +132,16 @@ public String toString() {
public Statistic getStatistic() {
return statistic;
}

@Override
public Collection<Object[]> getModifiableCollection() {
throw new UnsupportedOperationException();
}

@Override
public TableModify toModificationRel(final RelOptCluster relOptCluster, final RelOptTable table, final CatalogReader schema,
final RelNode relNode, final Operation operation, final List<String> updateColumnList,
final List<RexNode> sourceExpressionList, final boolean flattened) {
return LogicalTableModify.create(table, schema, relNode, operation, updateColumnList, sourceExpressionList, flattened);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.optimizer.operator.physical;

import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rex.RexNode;

import java.util.List;

/**
* Enumerable modify.
*/
public final class EnumerableModify extends TableModify implements EnumerableRel {

public EnumerableModify(final RelOptCluster cluster, final RelTraitSet traitSet, final RelOptTable table, final CatalogReader catalogReader,
final RelNode input, final Operation operation, final List<String> updateColumnList, final List<RexNode> sourceExpressionList, final boolean flattened) {
super(cluster, traitSet, table, catalogReader, input, operation, updateColumnList, sourceExpressionList, flattened);
}

@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
return new EnumerableModify(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
}

@Override
public Result implement(final EnumerableRelImplementor implementor, final Prefer prefer) {
// TODO generate modification statements based on dataset and related table information.
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.optimizer.planner.rule.converter;

import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.shardingsphere.sqlfederation.optimizer.operator.physical.EnumerableModify;

/**
* Enumerable modify converter rule.
*/
public final class EnumerableModifyConverterRule extends ConverterRule {

public static final Config DEFAULT_CONFIG = Config.INSTANCE.withConversion(LogicalTableModify.class, Convention.NONE, EnumerableConvention.INSTANCE,
EnumerableModifyConverterRule.class.getSimpleName()).withRuleFactory(EnumerableModifyConverterRule::new);

private EnumerableModifyConverterRule(final Config config) {
super(config);
}

@Override
public RelNode convert(final RelNode rel) {
TableModify modify = (TableModify) rel;
ModifiableTable modifiableTable = modify.getTable().unwrap(ModifiableTable.class);
if (null == modifiableTable) {
return null;
} else {
RelTraitSet traitSet = modify.getTraitSet().replace(EnumerableConvention.INSTANCE);
return new EnumerableModify(modify.getCluster(), traitSet, modify.getTable(), modify.getCatalogReader(), convert(modify.getInput(), traitSet),
modify.getOperation(), modify.getUpdateColumnList(), modify.getSourceExpressionList(), modify.isFlattened());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.view.ShardingSphereViewExpander;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.rule.converter.EnumerableModifyConverterRule;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.rule.converter.EnumerableScanConverterRule;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.rule.transformation.PushFilterIntoScanRule;
import org.apache.shardingsphere.sqlfederation.optimizer.planner.rule.transformation.PushProjectIntoScanRule;
Expand Down Expand Up @@ -125,13 +126,13 @@ private static void setUpRules(final RelOptPlanner planner) {
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SPOOL_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_INTERSECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_MINUS_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_WINDOW_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_MATCH_RULE);
planner.addRule(EnumerableScanConverterRule.DEFAULT_CONFIG.toRule());
planner.addRule(EnumerableModifyConverterRule.DEFAULT_CONFIG.toRule());
}

private static Collection<RelOptRule> getSubQueryRules() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Objects;

/**
Expand All @@ -49,7 +50,15 @@ public static TestCasesLoader getInstance() {
* @throws JAXBException exception for parse xml file.
*/
public Collection<TestCase> generate() throws IOException, JAXBException {
URL url = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("cases/federation-query-sql-cases.xml"));
Collection<TestCase> result = new LinkedList<>();
URL queryCaseUrl = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("cases/federation-query-sql-cases.xml"));
URL deleteCaseUrl = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("cases/federation-delete-sql-cases.xml"));
result.addAll(loadTestCase(queryCaseUrl));
result.addAll(loadTestCase(deleteCaseUrl));
return result;
}

private Collection<TestCase> loadTestCase(final URL url) throws IOException, JAXBException {
try (FileReader reader = new FileReader(url.getFile())) {
TestCases testCases = (TestCases) JAXBContext.newInstance(TestCases.class).createUnmarshaller().unmarshal(reader);
return testCases.getTestCases();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<test-cases>
<test-case sql="DELETE FROM t_order_federate">
<assertion expected-result="EnumerableModify(table=[[federate_jdbc, t_order_federate]], operation=[DELETE], flattened=[false]) EnumerableScan(table=[[federate_jdbc, t_order_federate]], sql=[SELECT * FROM `federate_jdbc`.`t_order_federate`], dynamicParameters=[null]) " />
</test-case>

<test-case sql="DELETE FROM t_single_table WHERE id in (SELECT order_id FROM t_order)">
<assertion expected-result="EnumerableModify(table=[[federate_jdbc, t_single_table]], operation=[DELETE], flattened=[false]) EnumerableCalc(expr#0..3=[{inputs}], proj#0..2=[{exprs}]) EnumerableHashJoin(condition=[=($1, $3)], joinType=[inner]) EnumerableScan(table=[[federate_jdbc, t_single_table]], sql=[SELECT * FROM `federate_jdbc`.`t_single_table`], dynamicParameters=[null]) EnumerableAggregate(group=[{0}]) EnumerableScan(table=[[federate_jdbc, t_order]], sql=[SELECT `order_id` FROM `federate_jdbc`.`t_order`], dynamicParameters=[null]) " />
</test-case>
</test-cases>