Skip to content

Calcite Integration

Paul Rogers edited this page Nov 10, 2022 · 7 revisions

Basics

Druid's schemas, etc. are configured in the DruidCalciteSchemaModule class.

Schema

DruidSchemaCatalog is the root schema, created by the RootSchemaProvider. The schemas are given by an injected Set<NamedSchema>. The set of schemas are defined in DruidCalciteSchemaModule via a Guice Polybind.

List of schemas:

  • NamedDruidSchema, DruidSchema for the druid name space.
  • NamedLookupSchema, LookupSchema for the lookup name space.
  • NamedSystemSchema, SystemSchema for the sys name space.
  • NamedViewSchema, ViewSchema for the view name space. The view implementation in Druid is a stub. Extensions provide the actual view storage.
  • InformationSchema (no named schema) for Calcite's built-in INFORMATION_SCHEMA name space.

The schema bootstrap process is:

  • A schema provider uses Guice to attach a NamedSchema a Guice multi-binder by calling SqlBindings.addSchema() Druid's built-in schemas are added in DruidCalciteSchemaModule. RootSchemaProvider creates the root schema via Guice by building from the set of NamedSchema objects added in Guice. RootSchemaProvider can be seen as the bridge between Guice-land and Calcite-land.

DruidSchema

Holds a list of segments (a datasource does not exist as a separate thing, only as a collection of segments.)

  private final ConcurrentHashMap<String, ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata>> segmentMetadataInfo
      = new ConcurrentHashMap<>();

Holds the list of "druid tables" as a hash map:

  private final ConcurrentMap<String, DruidTable> tables = new ConcurrentHashMap<>();

The list of segments (and thus tables) is cached. Since it is shared, Druid presumably handles all segments the same.

The bulk of this class seems to be around the complex code to manage the segment cache.

Tables

Calcite's table hierarchy:

  • Table: Generic table definition.
  • TranslatableTable: Table which can be translated to a relational expression.

A DruidTable represents a datasource and implements TranslatableTable along with an associated DataSource. Here the naming gets confusing: a DataSource is a JSON-serializable object that represents multiple things including a "table" (datasource). The DataSource is a non-SQL concept, and so has a foot in Druid-land.

Functions

Druid maintains a DruidOperatorTable for the Druid-defined operators. Definitions are injected via Guice in the form of a SqlOperatorConversion which provides Druid-specific functionality, and a calciteOperator() method to produce the Calcite form of the operator, which is what Calcite uses. The operator concept is a type of AST node, and so is constant. The corresponding AST node is a SqlCall of some form.

For a table macro, there appear to be multiple levels.

  • ExternalOperatorConversion: Druid-specific, provides the operator: ExternalOperator
  • ExternalOperator extends SqlUserDefinedTableMacro, which defines the argument signature.
  • The actual work is done by ExternalTableMacro implements TableMacro, injected into ExternalOperatorConversion, and passed to the SqlUserDefinedTableMacro constructor.
  • ExternalTableMacro.TranslatableTable apply(final List<Object> arguments) accepts a list of positional arguments, parses them as JSON, and builds the DruidTable.
  • The SqlUserDefinedTableMacro class provides TranslatableTable getTable(.) which converts positional operands from Calcite to Java form, and calls the above apply(.) method.
  • ProcedureNamespace.validateImpl() checks for the SqlUserDefinedTableMacro class. If the operator is of that type, it passes the call binding arguments to the above, and uses the table to get the table type.
  • Later, SqlToRelConverter.convertCollectionTable() expects a node of type SqlUserDefinedTableMacro calls getTable(.) again, this time with the operands without conversion (?).

Notes:

  • The above suggests there is benefit in doing some caching in the AST nodes to avoid multiple resolutions of the function.
  • All the validation wants is the table type (schema). When this is provided externally (via the mechanism we're trying to create), we don't even need the table.
  • SqlUserDefinedTableMacro handles conversion of operands (twice). It is the thing to change to change how operands are handled.

Call resolution sequence:

  • ProcedureNamespace.validateImpl(.) special cases SqlUserDefinedTableMacro.
  • SqlCallBinding permutes operands. (Hence, must be replaced to handle our case.)
  • SqlUserDefinedTableMacro converts operands to Java. But, methods are private, so must be cloned.

External Tables

ExternalDataSource: plan-only external tables, derived from DataSource. consists of:

  private final InputSource inputSource;
  private final InputFormat inputFormat;
  private final RowSignature signature;

Created by the ExternalTableMacro

ExternalTableScan "Represents a scan of an external table. Generated by DruidTable when its datasource is an ExternalDataSource." Seems a bit awkward: a DruidTable is not only a datasource, it is also an external table.

Planner

The DruidPlanner appears to have started by planning only SELECT queries, then evolved to handle EXPLAIN and INSERT. Since these both have a SELECT internally, the code has a complex layer if if-statements to handle the various bits. Further, the planner is designed to start over from parsing on each of the validate, prepare and plan steps. This results in a rather complex bit of code!

Major players:

  • SqlResource: HTTP endpoint for request/response queries.
  • SqlLifecycle: Abstraction over the planning and execution process.
  • DruidPlanner: Wrapper for the Calcite planner.

Minor players:

  • PlannerResult: returns an executable result from the planner along with row schema. Held by SqlLifecycle.

Planner Interface

The planner is a concrete class. The following is its (pretend) interface:

public interface DruidPlanner
{
  /**
   * Validates a SQL query and populates {@link PlannerContext#getResourceActions()}.
   *
   * @return set of {@link Resource} corresponding to any Druid datasources or views which are taking part in the query.
   */
  public ValidationResult validate(boolean authorizeContextParams) throws SqlParseException, ValidationException;

  /**
   * Prepare an SQL query for execution, including some initial parsing and validation and any dynamic parameter type
   * resolution, to support prepared statements via JDBC.
   *
   * In some future this could perhaps re-use some of the work done by {@link #validate(boolean)}
   * instead of repeating it, but that day is not today.
   */
  public PrepareResult prepare() throws SqlParseException, ValidationException, RelConversionException;

  /**
   * Plan an SQL query for execution, returning a {@link PlannerResult} which can be used to actually execute the query.
   *
   * Ideally, the query can be planned into a native Druid query, using {@link #planWithDruidConvention}, but will
   * fall-back to {@link #planWithBindableConvention} if this is not possible.
   *
   * In some future this could perhaps re-use some of the work done by {@link #validate(boolean)}
   * instead of repeating it, but that day is not today.
   */
  public PlannerResult plan() throws SqlParseException, ValidationException, RelConversionException;

  public PlannerContext getPlannerContext();
  public void close();
}

PlannerFactory

Guice-created singleton. Key methods:

  /**
   * Create a Druid query planner from an initial query context
   */
  public DruidPlanner createPlanner(final String sql, final QueryContext queryContext)
  {
    final PlannerContext context = PlannerContext.create(
        sql,
        operatorTable,
        macroTable,
        jsonMapper,
        plannerConfig,
        rootSchema,
        queryContext
    );

    return createPlannerWithContext(context);
  }

  /**
   * Create a new Druid query planner, re-using a previous {@link PlannerContext}
   */
  public DruidPlanner createPlannerWithContext(final PlannerContext plannerContext)
  {
    return new DruidPlanner(buildFrameworkConfig(plannerContext), plannerContext, queryMakerFactory);
  }

The PlannerContext passes information into the planner. There are two kinds: per-query information (the query and query context), and system-wide configured information (everything else).

The createPlannerWithContext() method is called only for one test: SqlLifecycleTest, where it is mocked.

SqlLifecycle

The planner is effectively tightly coupled with SqlLifecycle. Basic protocol from SqlResource:

// Create
sqlLifecycleFactory.factorize();
// Gather the query and context
lifecycle.initialize(sqlQuery.getQuery(), new QueryContext(sqlQuery.getContext()));
// Sets parameters. Ambiguous: planner context may or may not be created.
lifecycle.setParameters(sqlQuery.getParameterList());
// One pass though the planner, as described below.
lifecycle.validateAndAuthorize(req);
// Another pass though the planner, as described below.
lifecycle.plan();
lifecycle.createRowTransformer();
// Returns a Sequence<Object[]>
lifecycle.execute();

The planner itself is created, used, and discarded in various of the above operations: the planner is not persistent, so the lifecycle has to capture the output of each step for later use: it cannot simply obtain those values from the planner when needed.

Validate

Called from SqlLifecycle.validate(), which does a one-time pass through the planner:

  private ValidationResult validate(AuthenticationResult authenticationResult)
  {
    try (DruidPlanner planner = plannerFactory.createPlanner(sql, queryContext)) {
      // set planner context for logs/metrics in case something explodes early
      this.plannerContext = planner.getPlannerContext();
      this.plannerContext.setAuthenticationResult(authenticationResult);
      // set parameters on planner context, if parameters have already been set
      this.plannerContext.setParameters(parameters);
      this.validationResult = planner.validate(authConfig.authorizeQueryContextParams());
      return validationResult;
    }

The plannerContext is retained, though it was created for, and updated in, this planning session.

Prepare

Called from SqlLifecycle.prepare():

  /**
   * Prepare the query lifecycle for execution, without completely planning into something that is executable, but
   * including some initial parsing and validation and any dynamic parameter type resolution, to support prepared
   * statements via JDBC.
   */
  public PrepareResult prepare() throws RelConversionException
  {
    ...
    try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
      this.prepareResult = planner.prepare();
      return prepareResult;
    }

Plan

Called from SqlLifecycle.plan():

  public void plan() throws RelConversionException
  {
    ...
    try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
      this.plannerResult = planner.plan();
    }

Execute

Called from SqlLifecycle.execute():

  public Sequence<Object[]> execute()
  {
    return plannerResult.run();
  }

See PlannerResult: execution is in the following form:

  private final Supplier<Sequence<Object[]>> resultsSupplier;

  public Sequence<Object[]> run()
  {
    return resultsSupplier.get();
  }

Consumers

  • The avatica passage, creates a JDBC connection-oriented session. This is the only consumer of prepare()?
  • SqlResource
  • Extensions

Comparison of the Three Planner Passes

Setup

The plannerFactory is a global singleton: only the values passed to planner creation vary.

Verify

SqlLifecycle.validate():

    try (DruidPlanner planner = plannerFactory.createPlanner(sql, queryContext)) {
      plannerContext.setAuthenticationResult(authenticationResult);
      // set parameters on planner context, if parameters have already been set
      plannerContext.setParameters(parameters);
      validationResult = planner.validate(authConfig.authorizeQueryContextParams());
  • Creates a planner with the query and context.
  • Passes in the authorization result (info needed to authorize resources).
  • Passes in parameter values.
  • Identifies whether to authorize the query context parameters.

Prepare

SqlLifecycle.prepare():

    try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
      this.prepareResult = planner.prepare();
  • Reuses the planner context already created (presumably by validate()).

Plan

SqlLifecycle.plan():

    try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
      this.plannerResult = planner.plan();
    }
  • Reuses the planner context already created (presumably by validate()).

Parse

The Parse step produces a tree of SqlNodes. The parse step is the same for all three paths:

Validate

    resetPlanner();
    final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));

Prepare

    resetPlanner();
    final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));

Plan

    resetPlanner();
    final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));

Validate

Performs the Calcite SQL validation. which returns a new, validated node. Calcite has a self-contained validation method. Druid creates its own validator to capture parameter types. Prepare uses the Calcite version, however.

Rewrite of dynamic parameters done inline. One path handles exceptions, the other does not.

Validate

    try {
      validatedQueryNode = validator.validate(rewriteDynamicParameters(parsed.getQueryNode()));
    }
    catch (RuntimeException e) {
      throw new ValidationException(e);
    }

Prepare

Prepare actually validates twice: once with the Stock validator, a second time with the Druid validator.

    final SqlNode validatedQueryNode = planner.validate(parsed.getQueryNode());
    ...
    final RelDataType parameterTypes = validator.getParameterRowType(validator.validate(validatedQueryNode));

Plan

No handling of the RuntimeException in this path. Maybe can't fail because of validation checks?

    final SqlNode parameterizedQueryNode = rewriteDynamicParameters(parsed.getQueryNode());
    final SqlNode validatedQueryNode = planner.validate(parameterizedQueryNode);

Convert to Relational Algebra

Converts the SQL parse tree to a set of relational operator nodes (RelNodes).

Nothing for verify, which works only with the SqlNodes.

Prepare

    final RelRoot rootQueryRel = planner.rel(validatedQueryNode);

Plan

    final RelRoot rootQueryRel = planner.rel(validatedQueryNode);

Specific functionality

Verify

Validate permissions:

    SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(validator, plannerContext);
    validatedQueryNode.accept(resourceCollectorShuttle);

    final Set<ResourceAction> resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions());

    if (parsed.getInsertNode() != null) {
      final String targetDataSource = validateAndGetDataSourceForInsert(parsed.getInsertNode());
      resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE));
    }
    if (authorizeContextParams) {
      plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> resourceActions.add(
          new ResourceAction(new Resource(contextParam, ResourceType.QUERY_CONTEXT), Action.WRITE)
      ));
    }

    plannerContext.setResourceActions(resourceActions);
    return new ValidationResult(resourceActions);

Prepare

    final SqlValidator validator = getValidator();
    final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
    final RelDataType parameterTypes = validator.getParameterRowType(validator.validate(validatedQueryNode));
    final RelDataType returnedRowType;

    if (parsed.getExplainNode() != null) {
      returnedRowType = getExplainStructType(typeFactory);
    } else {
      returnedRowType = buildQueryMaker(rootQueryRel, parsed.getInsertNode()).getResultType();
    }

    return new PrepareResult(returnedRowType, parameterTypes);

Plan

        plannerContext.getQueryContext().addSystemParam(
            DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
            plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity())
        );
    this.rexBuilder = new RexBuilder(planner.getTypeFactory());

    try {
      return planWithDruidConvention(rootQueryRel, parsed);
    ...

Planning

DruidRel represents a Druid query. One of:

  • DruidJoinQueryRel
  • DruidOuterJoinQueryRel
  • DruidQueryRel
  • DruidUnionDataSourceRel
  • DruidUnionRel

Interestingly, there is no insert Rel.

It looks like we use the logical operators to represent the physical plan rather than components of the plan. That is, the conversion goes directly from the logical plan to the near-complete Druid query. The conversion is done in:

  public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);

The relational operators are created via the DruidRules.

Calcite Tests

References