Skip to content

Commit

Permalink
Feature configurable calcite bloat (#16248)
Browse files Browse the repository at this point in the history
* Configurable bloat for calcite ProjectMergeRule implemented

* Comment added

* Default bloat value increased to 1000

* Implemented bloat configuration from QueryContext

* Code refactored, docs updated

---------

Co-authored-by: sviatahorau <[email protected]>
  • Loading branch information
nozjkoitop and sviatahorau authored May 6, 2024
1 parent ac42737 commit b5958b6
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters
|`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.|
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|

## Parameters by query type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.rules.DateRangeRules;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.SqlExplainFormat;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class CalciteRulesManager
private static final int HEP_DEFAULT_MATCH_LIMIT = Integer.parseInt(
System.getProperty(HEP_DEFAULT_MATCH_LIMIT_CONFIG_STRING, "1200")
);
public static final String BLOAT_PROPERTY = "sqlPlannerBloat";
public static final int DEFAULT_BLOAT = 1000;

/**
* Rules from {@link org.apache.calcite.plan.RelOptRules#BASE_RULES}, minus:
Expand All @@ -96,12 +99,14 @@ public class CalciteRulesManager
* and {@link CoreRules#FILTER_INTO_JOIN}, which are part of {@link #FANCY_JOIN_RULES}.
* 4) {@link CoreRules#PROJECT_FILTER_TRANSPOSE} because PartialDruidQuery would like to have the Project on top of the Filter -
* this rule could create a lot of non-useful plans.
* 5) {@link CoreRules#PROJECT_MERGE} added later with bloat parameter configured from query context as a workaround for Calcite exception
* (there are not enough rules to produce a node with desired properties) thrown while running complex sql-queries with
* big amount of subqueries.
*/
private static final List<RelOptRule> BASE_RULES =
ImmutableList.of(
CoreRules.AGGREGATE_STAR_TABLE,
CoreRules.AGGREGATE_PROJECT_STAR_TABLE,
CoreRules.PROJECT_MERGE,
CoreRules.FILTER_SCAN,
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.JOIN_PUSH_EXPRESSIONS,
Expand Down Expand Up @@ -452,6 +457,17 @@ public List<RelOptRule> bindableConventionRuleSet(final PlannerContext plannerCo
.build();
}

public List<RelOptRule> configurableRuleSet(PlannerContext plannerContext)
{
return ImmutableList.of(ProjectMergeRule.Config.DEFAULT.withBloat(getBloatProperty(plannerContext)).toRule());
}

private int getBloatProperty(PlannerContext plannerContext)
{
final Integer bloat = plannerContext.queryContext().getInt(BLOAT_PROPERTY);
return (bloat != null) ? bloat : DEFAULT_BLOAT;
}

public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
{
final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
Expand All @@ -461,6 +477,7 @@ public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
rules.addAll(BASE_RULES);
rules.addAll(ABSTRACT_RULES);
rules.addAll(ABSTRACT_RELATIONAL_RULES);
rules.addAll(configurableRuleSet(plannerContext));

if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
rules.addAll(FANCY_JOIN_RULES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.schema.Schema;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -61,17 +62,21 @@
import javax.validation.Validator;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;

import static org.apache.calcite.plan.RelOptRule.any;
import static org.apache.calcite.plan.RelOptRule.operand;
import static org.apache.druid.sql.calcite.planner.CalciteRulesManager.BLOAT_PROPERTY;
import static org.apache.druid.sql.calcite.planner.CalciteRulesManager.DEFAULT_BLOAT;

@ExtendWith(EasyMockExtension.class)
public class CalcitePlannerModuleTest extends CalciteTestBase
{
private static final String SCHEMA_1 = "SCHEMA_1";
private static final String SCHEMA_2 = "SCHEMA_2";
private static final String DRUID_SCHEMA_NAME = "DRUID_SCHEMA_NAME";
private static final int BLOAT = 1200;

@Mock
private NamedSchema druidSchema1;
Expand Down Expand Up @@ -204,4 +209,51 @@ public void testExtensionCalciteRule()
.contains(customRule);
Assert.assertTrue(containsCustomRule);
}

@Test
public void testConfigurableBloat()
{
ObjectMapper mapper = new DefaultObjectMapper();
PlannerToolbox toolbox = new PlannerToolbox(
injector.getInstance(DruidOperatorTable.class),
macroTable,
mapper,
injector.getInstance(PlannerConfig.class),
rootSchema,
joinableFactoryWrapper,
CatalogResolver.NULL_RESOLVER,
"druid",
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.TEST_AUTHORIZER_MAPPER,
AuthConfig.newBuilder().build()
);

PlannerContext contextWithBloat = PlannerContext.create(
toolbox,
"SELECT 1",
new NativeSqlEngine(queryLifecycleFactory, mapper),
Collections.singletonMap(BLOAT_PROPERTY, BLOAT),
null
);

PlannerContext contextWithoutBloat = PlannerContext.create(
toolbox,
"SELECT 1",
new NativeSqlEngine(queryLifecycleFactory, mapper),
Collections.emptyMap(),
null
);

assertBloat(contextWithBloat, BLOAT);
assertBloat(contextWithoutBloat, DEFAULT_BLOAT);
}

private void assertBloat(PlannerContext context, int expectedBloat)
{
Optional<ProjectMergeRule> firstProjectMergeRule = injector.getInstance(CalciteRulesManager.class).baseRuleSet(context).stream()
.filter(rule -> rule instanceof ProjectMergeRule)
.map(rule -> (ProjectMergeRule) rule)
.findAny();
Assert.assertTrue(firstProjectMergeRule.isPresent() && firstProjectMergeRule.get().config.bloat() == expectedBloat);
}
}

0 comments on commit b5958b6

Please sign in to comment.