-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: changes to upstream DF, in order to enable parallelized writes with ParquetSink #11
Conversation
…set of TableParquetOptions
type Error = DataFusionError; | ||
|
||
fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> { | ||
let parquet_session_options = &parquet_options.global; | ||
let ParquetOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ParquetOptions
are the configuration which can be provided within a SQL query, and therefore are intended for use in an easily parsible format (refer to the ConfigField
trait and associated macros in the linked file).
The sorting_columns may lend itself to this use case, of being provided within a SQL query and being easier to parse. However, the same is not true for the user-provided kv_metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface; is this referring to a per-row-group applied sorting that only occurs on write? Is there a use case for datafusion, given that we already sort earlier in the batch stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface;
In theory it is supposed to be used to let readers infer information from the file. I don't know how widely it is written or used by other parquet readers/writers.
IOx stores its sort information in its own metadata, so I think setting the fields in the parquet metadata could be a separate project
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic idea looks good to me here. 👍 @wiedld
One thing that might be worth considering is how to test this
For example, do we want to add a SQL level API like
COPY (values (1), (2)) TO 'foo.parquet' OPTION (metadata "foo:bar").)
(probably not that exact syntax)
Or maybe we just expose the APIs for use programatically
datafusion/common/src/config.rs
Outdated
/// Optional, additional metadata to be inserted into the key_value_metadata | ||
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). | ||
#[cfg(feature = "parquet")] | ||
pub key_value_metadata: Option<Vec<KeyValue>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Key/Value is just owned strings. https://docs.rs/parquet/latest/parquet/format/struct.KeyValue.html
So we could avoid the cfg
s by doing something like
pub key_value_metadata: Option<Vec<KeyValue>>, | |
pub key_value_metadata: HashMap<String, Option<String>> |
ANd then translating that to KeyValues during the write
This would also make the protobuf easier to handle (just follow the encoding for HashMaps)
type Error = DataFusionError; | ||
|
||
fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> { | ||
let parquet_session_options = &parquet_options.global; | ||
let ParquetOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface;
In theory it is supposed to be used to let readers infer information from the file. I don't know how widely it is written or used by other parquet readers/writers.
IOx stores its sort information in its own metadata, so I think setting the fields in the parquet metadata could be a separate project
reorder_filters: _, | ||
allow_single_file_parallelism: _, | ||
maximum_parallel_row_group_writers: _, | ||
maximum_buffered_record_batches_per_stream: _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice change
👌 |
Added sqllogictests, which did require a change for proper config after statement parse. Syntax is almost exactly as requested by @alamb . |
This patch will be included in this DataFusion update: https://github.com/influxdata/influxdb_iox/pull/10780. It is in queue behind two PRs: https://github.com/influxdata/influxdb_iox/pull/10764 and https://github.com/influxdata/influxdb_iox/pull/10772. If these two PR got deployed without issue, I plan to merge https://github.com/influxdata/influxdb_iox/pull/10780 tomorrow morning |
#13 brought in the upstream change (apache#10224 / apache@9c8873a), so closing this one |
… `interval` (apache#11466) * Unparser rule for datatime cast (#10) * use timestamp as the identifier for date64 * rename * implement CustomDialectBuilder * fix * dialect with interval style (#11) --------- Co-authored-by: Phillip LeBlanc <[email protected]> * fmt * clippy * doc * Update datafusion/sql/src/unparser/expr.rs Co-authored-by: Andrew Lamb <[email protected]> * update the doc for CustomDialectBuilder * fix doc test --------- Co-authored-by: Phillip LeBlanc <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
… `interval` (apache#11466) * Unparser rule for datatime cast (#10) * use timestamp as the identifier for date64 * rename * implement CustomDialectBuilder * fix * dialect with interval style (#11) --------- Co-authored-by: Phillip LeBlanc <[email protected]> * fmt * clippy * doc * Update datafusion/sql/src/unparser/expr.rs Co-authored-by: Andrew Lamb <[email protected]> * update the doc for CustomDialectBuilder * fix doc test --------- Co-authored-by: Phillip LeBlanc <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
This branches off the most recent DF version in iox.
Then adds additional patches in order to make the parallelize ParquetSink work with our metadata use case.
Background.
IOx adds our own metadata in the parquet file. Currently, we do so using the WriterProperties with the ArrowWriter. When we did the ParquetSink parallelized write PoC, we also provided this iox metadata by adding it to the WriterProperties given to the ParquetSink::write_all().
The approach used in the PoC is no longer viable. There was a change to unify the different writer options across sink types, specifically to make
COPY TO
andcreate external table
have a uniform configuration. Users can now specify the configuration with the query (e.g.COPY <src> TO <sink> (<config_options>)
). This was a good high level change; however, we would like to iterate on this approach.The current implementation derives the writer properties from the TableParquetOptions. This conversion always sets the sorting_columns and user-defined kv_metadata as None, as demonstrated in the first commit. We have several choices in how to return the ability to set these options -- choices which are commented below in this WIP.