-
Notifications
You must be signed in to change notification settings - Fork 0
MSQ Schemaless Notes
- Calcite has the notion of a
SqlTypeName.DYNAMIC_STAR
which apparently means "a set of columns to be defined later." - Calcite has
RelDataTypeHolder
: "Holding the expandable list of fields for dynamic table"
Behavior:
- Input source should be configured to be dynamic. For CSV, say, it doesn't make sense to define some columns but not others.
- Input columns have the type from the input system. For CSV, all columns are string. Else, use whatever Clint created.
- Input is dynamic, so the list of columns are known only at run time:
`*` -> [ Input source ] -> (cols)
Proposal:
In SQL, every input source can have an explicit or inferred schema. We have the explicit form today. For inferred, we use the schema inference rules provided by schemaless, which will, essentially, produce a map of name/value pairs, where the values can be of any type.
We want to minimize changes to the planner and to MSQ. This means we won't a) require an external declaration of the schema (as in HMS) or b) do a dynamic schema (as in Drill). Instead, we want Calcite to work with known types.
To do this, we revise the way we declare schema (or introduce a new keyword):
--** finalizeAggregators=false
INSERT INTO dst
SELECT TIME_PARSE(ts), SUM(bytes), *
FROM TABLE(local(format => 'csv', ..., inferSchema -> TRUE))
PARTIAL (ts VARCHAR, bytes BIGINT)
Here, the finalizeAggregates
and PARTIAL
are placeholders that mean "do rollup" and a new way to define the schema. Also, inferSchema
is a proxy for the input-source-specific options to infer a schema. (For CSV, say, one has to request inference and identify which line contains the header.)
The PARTIAL
schema is a statement that Druid will infer schema. Since that inference happens at run time, the PARTIAL
clause tells the planner what the runtime will find: that there will be a column ts
and it should be treated as a string
. Also, that there will be a column bytes
that, in CSV, is a string, but should be parsed as long
. Note that PARTIAL
has different semantics than the existing EXTEND
keyword which defines the full schema.
This trick works only for file formats that can infer column names and types. For text-based files, it is fine to infer the columns as strings unless the user says otherwise. For CSV files without headers, Druid can put all columns into a single array, called, say, columns
. This is awkward, but would only be used to get started: a next pass would probably spell out the schema in the query or in the catalog for text files without headings.
The partial schema gives Calcite what it needs to handle the expressions in the query.
That leaves the other columns. Including a wildcard (*
) in a query says to include all columns not listed in the partial schema. All such columns will have the names and types defined by the schemaless type inference rules.
A result of this behavior is that Calcite cannot know the schema of the target segments. It can know a subset of the schema (those named explicitly), but not those columns defined at runtime. For such columns, Calcite will report __dynamic TYPE('COMPLEX<json>')
as the implicit column that holds the "columns to be named later."
A nice-to-have is to add additional syntax to exclude columns:
...
FROM TABLE(local(
format => 'csv',
...,
inferSchema -> TRUE,
exclude => ARRAY["comment"))
PARTIAL (ts VARCHAR, bytes BIGINT)
Native Query
The native query requires a special field, which Calcite calls a "dynamic star" (or **
), to represent the list of dynamic columns. Or, the field can be named "__dynamic" with some special expression that says what to do.
Each input source has some way to specify that the code is to infer the schema. (Each class may have its own way.) Each class also takes an "exclude list". For our example above, that list would be ("ts", "b")
. All other columns go into the map.
MSQ Internals
MSQ passes the input source spec into the input source code, which includes the infer schema option and include list. The cursor for such an input source is responsible for creating the __dynamic
column as a map. The caller passes in an "exclude" list.
Since the map is just another column, MSQ processes it as such. MSQ performs no operations on the columns in intermediate stages other than using the map as a key for rollup (with the obvious caveat that the map keys must be ordered.) That is, rather than grouping being based on an enumerated set of auto-discovered columns, grouping is just based on the map as a whole. This avoids the need to ever "expand" the map into actual columns within MSQ.
When MSQ goes to write columns to segments, it expands the __dynamic
column into individual columns (indexers) using the code that already exists for other ingestion paths.