Skip to content

MSQ Schemaless Notes

Paul Rogers edited this page Mar 24, 2023 · 1 revision
  • Calcite has the notion of a SqlTypeName.DYNAMIC_STAR which apparently means "a set of columns to be defined later."
  • Calcite has RelDataTypeHolder: "Holding the expandable list of fields for dynamic table"

Behavior:

  • Input source should be configured to be dynamic. For CSV, say, it doesn't make sense to define some columns but not others.
  • Input columns have the type from the input system. For CSV, all columns are string. Else, use whatever Clint created.
  • Input is dynamic, so the list of columns are known only at run time:
`*` -> [ Input source ] -> (cols)

Proposal:

In SQL, every input source can have an explicit or inferred schema. We have the explicit form today. For inferred, we use the schema inference rules provided by schemaless, which will, essentially, produce a map of name/value pairs, where the values can be of any type.

We want to minimize changes to the planner and to MSQ. This means we won't a) require an external declaration of the schema (as in HMS) or b) do a dynamic schema (as in Drill). Instead, we want Calcite to work with known types.

To do this, we revise the way we declare schema (or introduce a new keyword):

--** finalizeAggregators=false
INSERT INTO dst
SELECT TIME_PARSE(ts), SUM(bytes), *
FROM TABLE(local(format => 'csv', ..., inferSchema -> TRUE))
PARTIAL (ts VARCHAR, bytes BIGINT)

Here, the finalizeAggregates and PARTIAL are placeholders that mean "do rollup" and a new way to define the schema. Also, inferSchema is a proxy for the input-source-specific options to infer a schema. (For CSV, say, one has to request inference and identify which line contains the header.)

The PARTIAL schema is a statement that Druid will infer schema. Since that inference happens at run time, the PARTIAL clause tells the planner what the runtime will find: that there will be a column ts and it should be treated as a string. Also, that there will be a column bytes that, in CSV, is a string, but should be parsed as long. Note that PARTIAL has different semantics than the existing EXTEND keyword which defines the full schema.

This trick works only for file formats that can infer column names and types. For text-based files, it is fine to infer the columns as strings unless the user says otherwise. For CSV files without headers, Druid can put all columns into a single array, called, say, columns. This is awkward, but would only be used to get started: a next pass would probably spell out the schema in the query or in the catalog for text files without headings.

The partial schema gives Calcite what it needs to handle the expressions in the query.

That leaves the other columns. Including a wildcard (*) in a query says to include all columns not listed in the partial schema. All such columns will have the names and types defined by the schemaless type inference rules.

A result of this behavior is that Calcite cannot know the schema of the target segments. It can know a subset of the schema (those named explicitly), but not those columns defined at runtime. For such columns, Calcite will report __dynamic TYPE('COMPLEX<json>') as the implicit column that holds the "columns to be named later."

A nice-to-have is to add additional syntax to exclude columns:

...
FROM TABLE(local(
  format => 'csv',
  ..., 
  inferSchema -> TRUE,
  exclude => ARRAY["comment"))
PARTIAL (ts VARCHAR, bytes BIGINT)

Native Query

The native query requires a special field, which Calcite calls a "dynamic star" (or **), to represent the list of dynamic columns. Or, the field can be named "__dynamic" with some special expression that says what to do.

Each input source has some way to specify that the code is to infer the schema. (Each class may have its own way.) Each class also takes an "exclude list". For our example above, that list would be ("ts", "b"). All other columns go into the map.

MSQ Internals

MSQ passes the input source spec into the input source code, which includes the infer schema option and include list. The cursor for such an input source is responsible for creating the __dynamic column as a map. The caller passes in an "exclude" list.

Since the map is just another column, MSQ processes it as such. MSQ performs no operations on the columns in intermediate stages other than using the map as a key for rollup (with the obvious caveat that the map keys must be ordered.) That is, rather than grouping being based on an enumerated set of auto-discovered columns, grouping is just based on the map as a whole. This avoids the need to ever "expand" the map into actual columns within MSQ.

When MSQ goes to write columns to segments, it expands the __dynamic column into individual columns (indexers) using the code that already exists for other ingestion paths.