From 9a74a2588ec99221fd66565b9099b2c0056492ec Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:52:53 -0500 Subject: [PATCH 01/11] Update sysinfo requirement from 0.32.0 to 0.33.0 (#6835) * Update sysinfo requirement from 0.32.0 to 0.33.0 Updates the requirements on [sysinfo](https://github.com/GuillaumeGomez/sysinfo) to permit the latest version. - [Changelog](https://github.com/GuillaumeGomez/sysinfo/blob/master/CHANGELOG.md) - [Commits](https://github.com/GuillaumeGomez/sysinfo/commits) --- updated-dependencies: - dependency-name: sysinfo dependency-type: direct:production ... Signed-off-by: dependabot[bot] * chore: Update example for API change --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Andrew Lamb --- parquet/Cargo.toml | 2 +- parquet/examples/write_parquet.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 4064baba0947..19f890710778 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,7 +67,7 @@ hashbrown = { version = "0.15", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] } +sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } [dev-dependencies] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs index 1b51d40c8134..ebdd9527b6f1 100644 --- a/parquet/examples/write_parquet.rs +++ b/parquet/examples/write_parquet.rs @@ -28,7 +28,7 @@ use parquet::arrow::ArrowWriter as ParquetWriter; use parquet::basic::Encoding; use parquet::errors::Result; use parquet::file::properties::{BloomFilterPosition, WriterProperties}; -use sysinfo::{MemoryRefreshKind, ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System}; +use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System}; #[derive(ValueEnum, Clone)] enum BloomFilterPositionArg { @@ -97,8 +97,7 @@ fn main() -> Result<()> { let file = File::create(args.path).unwrap(); let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; - let mut system = - System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + let mut system = System::new_with_specifics(RefreshKind::everything()); eprintln!( "{} Writing {} batches of {} rows. RSS = {}", now(), From 123045cc766d42d1eb06ee8bb3f09e39ea995ddc Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 16 Dec 2024 16:07:57 -0800 Subject: [PATCH 02/11] deprecate max_statistics_size writer property (#6884) --- parquet/src/bin/parquet-rewrite.rs | 1 + parquet/src/file/properties.rs | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index eaecda50375d..5a1ec94d5502 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -242,6 +242,7 @@ fn main() { if let Some(value) = args.dictionary_page_size_limit { writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value); } + #[allow(deprecated)] if let Some(value) = args.max_statistics_size { writer_properties_builder = writer_properties_builder.set_max_statistics_size(value); } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 7b688333e540..dc918f6b5634 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -41,6 +41,7 @@ pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000; /// Default value for [`WriterProperties::statistics_enabled`] pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; /// Default value for [`WriterProperties::max_statistics_size`] +#[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; @@ -350,7 +351,9 @@ impl WriterProperties { /// Returns max size for statistics. /// Only applicable if statistics are enabled. + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] pub fn max_statistics_size(&self, col: &ColumnPath) -> usize { + #[allow(deprecated)] self.column_properties .get(col) .and_then(|c| c.max_statistics_size()) @@ -601,7 +604,9 @@ impl WriterPropertiesBuilder { /// Sets default max statistics size for all columns (defaults to `4096`). /// /// Applicable only if statistics are enabled. + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] pub fn set_max_statistics_size(mut self, value: usize) -> Self { + #[allow(deprecated)] self.default_column_properties .set_max_statistics_size(value); self @@ -706,7 +711,9 @@ impl WriterPropertiesBuilder { /// Sets max size for statistics for a specific column. /// /// Takes precedence over [`Self::set_max_statistics_size`]. + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] pub fn set_column_max_statistics_size(mut self, col: ColumnPath, value: usize) -> Self { + #[allow(deprecated)] self.get_mut_props(col).set_max_statistics_size(value); self } @@ -896,6 +903,7 @@ struct ColumnProperties { codec: Option, dictionary_enabled: Option, statistics_enabled: Option, + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] max_statistics_size: Option, /// bloom filter related properties bloom_filter_properties: Option, @@ -934,6 +942,8 @@ impl ColumnProperties { } /// Sets max size for statistics for this column. + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] + #[allow(deprecated)] fn set_max_statistics_size(&mut self, value: usize) { self.max_statistics_size = Some(value); } @@ -998,7 +1008,9 @@ impl ColumnProperties { } /// Returns optional max size in bytes for statistics. + #[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")] fn max_statistics_size(&self) -> Option { + #[allow(deprecated)] self.max_statistics_size } @@ -1142,10 +1154,6 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), DEFAULT_STATISTICS_ENABLED ); - assert_eq!( - props.max_statistics_size(&ColumnPath::from("col")), - DEFAULT_MAX_STATISTICS_SIZE - ); assert!(props .bloom_filter_properties(&ColumnPath::from("col")) .is_none()); @@ -1222,13 +1230,11 @@ mod tests { .set_compression(Compression::GZIP(Default::default())) .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) - .set_max_statistics_size(50) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) .set_column_dictionary_enabled(ColumnPath::from("col"), true) .set_column_statistics_enabled(ColumnPath::from("col"), EnabledStatistics::Chunk) - .set_column_max_statistics_size(ColumnPath::from("col"), 123) .set_column_bloom_filter_enabled(ColumnPath::from("col"), true) .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64) .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1) @@ -1260,7 +1266,6 @@ mod tests { props.statistics_enabled(&ColumnPath::from("a")), EnabledStatistics::None ); - assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50); assert_eq!( props.encoding(&ColumnPath::from("col")), @@ -1275,7 +1280,6 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), EnabledStatistics::Chunk ); - assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); assert_eq!( props.bloom_filter_properties(&ColumnPath::from("col")), Some(&BloomFilterProperties { fpp: 0.1, ndv: 100 }) From f8f24cf58311fee7bb2843144a615a80fb281fcb Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 20:48:34 +0200 Subject: [PATCH 03/11] docs: fix typo (#6890) --- arrow-array/src/array/dictionary_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/array/dictionary_array.rs b/arrow-array/src/array/dictionary_array.rs index e69de783dd54..f852b57fb65e 100644 --- a/arrow-array/src/array/dictionary_array.rs +++ b/arrow-array/src/array/dictionary_array.rs @@ -249,7 +249,7 @@ pub struct DictionaryArray { /// map to the real values. keys: PrimitiveArray, - /// Array of dictionary values (can by any DataType). + /// Array of dictionary values (can be any DataType). values: ArrayRef, /// Values are ordered. From 54dccadccc6b599b93a46aef5f03dfd4d9b7349e Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 17 Dec 2024 10:49:07 -0800 Subject: [PATCH 04/11] fix deprecation notice (#6889) --- parquet/src/arrow/schema/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 4ae3fdb8e5cf..5d3d7b2a6541 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -345,7 +345,7 @@ impl<'a> ArrowSchemaConverter<'a> { /// /// The name of the root schema element defaults to `"arrow_schema"`, this can be /// overridden with [`ArrowSchemaConverter`] -#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")] +#[deprecated(since = "54.0.0", note = "Use `ArrowSchemaConverter` instead")] pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { ArrowSchemaConverter::new().convert(schema) } From 10793608c84f9d26d89d32e2cf72107e632ee84c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 04:52:44 -0500 Subject: [PATCH 05/11] Add Field::with_dict_is_ordered (#6885) --- arrow-schema/src/field.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index 7d47c0ae1dea..13bb7abf51b4 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -426,6 +426,19 @@ impl Field { } /// Returns whether this `Field`'s dictionary is ordered, if this is a dictionary type. + /// + /// # Example + /// ``` + /// # use arrow_schema::{DataType, Field}; + /// // non dictionaries do not have a dict is ordered flat + /// let field = Field::new("c1", DataType::Int64, false); + /// assert_eq!(field.dict_is_ordered(), None); + /// // by default dictionary is not ordered + /// let field = Field::new("c1", DataType::Dictionary(Box::new(DataType::Int64), Box::new(DataType::Utf8)), false); + /// assert_eq!(field.dict_is_ordered(), Some(false)); + /// let field = field.with_dict_is_ordered(true); + /// assert_eq!(field.dict_is_ordered(), Some(true)); + /// ``` #[inline] pub const fn dict_is_ordered(&self) -> Option { match self.data_type { @@ -434,6 +447,18 @@ impl Field { } } + /// Set the is ordered field for this `Field`, if it is a dictionary. + /// + /// Does nothing if this is not a dictionary type. + /// + /// See [`Field::dict_is_ordered`] for more information. + pub fn with_dict_is_ordered(mut self, dict_is_ordered: bool) -> Self { + if matches!(self.data_type, DataType::Dictionary(_, _)) { + self.dict_is_ordered = dict_is_ordered; + }; + self + } + /// Merge this field into self if it is compatible. /// /// Struct fields are merged recursively. From 3317989711a4c081f5118d8fe126280ced7a92fb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 05:52:15 -0500 Subject: [PATCH 06/11] Minor: make it easier to find fix instructions when `cargo fmt` on parquet fails (#6886) * Minor: make it easier to find instructions when fmt fails * purposely introduce a fmt issue * Revert "purposely introduce a fmt issue" This reverts commit 440e52079135df85128b15936425d2b5af488007. * Update .github/workflows/rust.yml Co-authored-by: Ed Seidl --------- Co-authored-by: Ed Seidl --- .github/workflows/rust.yml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ff5040fd2947..72a53263d330 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -101,12 +101,13 @@ jobs: - name: Format arrow run: cargo fmt --all -- --check - name: Format parquet - # Many modules in parquet are skipped, so check parquet separately. If this check fails, run: - # cargo fmt -p parquet -- --config skip_children=true `find ./parquet -name "*.rs" \! -name format.rs` - # from the top level arrow-rs directory and check in the result. + # Many modules in parquet are skipped, so check parquet separately # https://github.com/apache/arrow-rs/issues/6179 working-directory: parquet - run: cargo fmt -p parquet -- --check --config skip_children=true `find . -name "*.rs" \! -name format.rs` + run: | + # if this fails, run this from the parquet directory: + # cargo fmt -p parquet -- --config skip_children=true `find . -name "*.rs" \! -name format.rs` + cargo fmt -p parquet -- --check --config skip_children=true `find . -name "*.rs" \! -name format.rs` - name: Format object_store working-directory: object_store run: cargo fmt --all -- --check From 565c24b8071269b02c3937e34c51eacf0f4cbad6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 05:52:40 -0500 Subject: [PATCH 07/11] Minor: add comments explaining bad MSRV, output in json (#6857) * Minor: add comments explaining bad MSRV * purposely introduce msrv brek * output in JSON format * Revert "purposely introduce msrv brek" This reverts commit 61872b69a5a85748031fe852e48b8e3d3381d270. --- .github/workflows/rust.yml | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 72a53263d330..044250b70435 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -113,7 +113,7 @@ jobs: run: cargo fmt --all -- --check msrv: - name: Verify MSRV + name: Verify MSRV (Minimum Supported Rust Version) runs-on: ubuntu-latest container: image: amd64/rust @@ -127,13 +127,19 @@ jobs: run: cargo update -p ahash --precise 0.8.7 - name: Check arrow working-directory: arrow - run: cargo msrv --log-target stdout verify + run: | + # run `cd arrow; cargo msrv verify` to see problematic dependencies + cargo msrv verify --output-format=json - name: Check parquet working-directory: parquet - run: cargo msrv --log-target stdout verify + run: | + # run `cd parquet; cargo msrv verify` to see problematic dependencies + cargo msrv verify --output-format=json - name: Check arrow-flight working-directory: arrow-flight - run: cargo msrv --log-target stdout verify + run: | + # run `cd arrow-flight; cargo msrv verify` to see problematic dependencies + cargo msrv verify --output-format=json - name: Downgrade object_store dependencies working-directory: object_store # Necessary because tokio 1.30.0 updates MSRV to 1.63 @@ -143,4 +149,6 @@ jobs: cargo update -p url --precise 2.5.0 - name: Check object_store working-directory: object_store - run: cargo msrv --log-target stdout verify + run: | + # run `cd object_store; cargo msrv verify` to see problematic dependencies + cargo msrv verify --output-format=json From 61b7876d7a7d9de93e9db37bc96f87b1d646037f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 07:25:59 -0500 Subject: [PATCH 08/11] Add 53.4.0 to release schedule (#6896) * Add 54.4.0 to release schedule * prettoer --- README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 57794b1d6a46..f995ff6ad478 100644 --- a/README.md +++ b/README.md @@ -63,13 +63,14 @@ is described in the [contributing] guide. Planned Release Schedule -| Approximate Date | Version | Notes | -| ---------------- | -------- | --------------------------------------- | -| Nov 2024 | `53.3.0` | Minor, NO breaking API changes | -| Dec 2024 | `54.0.0` | Major, potentially breaking API changes | -| Jan 2025 | `54.1.0` | Minor, NO breaking API changes | -| Feb 2025 | `54.2.0` | Minor, NO breaking API changes | -| Mar 2025 | `55.0.0` | Major, potentially breaking API changes | +| Approximate Date | Version | Notes | +| ---------------- | -------- | ------------------------------------------ | +| Nov 2024 | `53.3.0` | Minor, NO breaking API changes | +| Dec 2024 | `54.0.0` | Major, potentially breaking API changes | +| Jan 2025 | `53.4.0` | Minor, NO breaking API changes (`53` line) | +| Jan 2025 | `54.1.0` | Minor, NO breaking API changes | +| Feb 2025 | `54.2.0` | Minor, NO breaking API changes | +| Mar 2025 | `55.0.0` | Major, potentially breaking API changes | [this ticket]: https://github.com/apache/arrow-rs/issues/5368 [semantic versioning]: https://semver.org/ From 13f3f213360441e6e7baaa1b98d23a0690d9572e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 07:49:51 -0500 Subject: [PATCH 09/11] Add deprecation / API removal policy (#6852) * Add deprecation / API removal policy * Increase proposal to 2 releases * change from policy to guidelines, add flexibility * prettier * Make instructions more actionable --- README.md | 27 +++++++++++++++++++++++++++ arrow/README.md | 2 +- parquet/README.md | 2 +- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f995ff6ad478..723249ad29e5 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,33 @@ versions approximately every 2 months. [`object_store`]: https://crates.io/crates/object_store +### Deprecation Guidelines + +Minor releases may deprecate, but not remove APIs. Deprecating APIs allows +downstream Rust programs to still compile, but generate compiler warnings. This +gives downstream crates time to migrate prior to API removal. + +To deprecate an API: + +- Mark the API as deprecated using `#[deprecated]` and specify the exact arrow-rs version in which it was deprecated +- Concisely describe the preferred API to help the user transition + +The deprecated version is the next version which will be released (please +consult the list above). To mark the API as deprecated, use the +`#[deprecated(since = "...", note = "...")]` attribute. + +Foe example + +```rust +#[deprecated(since = "51.0.0", note = "Use `date_part` instead")] +``` + +In general, deprecated APIs will remain in the codebase for at least two major releases after +they were deprecated (typically between 6 - 9 months later). For example, an API +deprecated in `51.3.0` can be removed in `54.0.0` (or later). Deprecated APIs +may be removed earlier or later than these guidelines at the discretion of the +maintainers. + ## Related Projects There are several related crates in different repositories diff --git a/arrow/README.md b/arrow/README.md index a1444005ec00..79aefaae9053 100644 --- a/arrow/README.md +++ b/arrow/README.md @@ -37,7 +37,7 @@ This crate is tested with the latest stable version of Rust. We do not currently The `arrow` crate follows the [SemVer standard] defined by Cargo and works well within the Rust crate ecosystem. See the [repository README] for more details on -the release schedule and version. +the release schedule, version and deprecation policy. [SemVer standard]: https://doc.rust-lang.org/cargo/reference/semver.html [repository README]: https://github.com/apache/arrow-rs diff --git a/parquet/README.md b/parquet/README.md index e9f52ff279d5..9ff1d921d692 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -36,7 +36,7 @@ This crate is tested with the latest stable version of Rust. We do not currently The `parquet` crate follows the [SemVer standard] defined by Cargo and works well within the Rust crate ecosystem. See the [repository README] for more details on -the release schedule and version. +the release schedule, version and deprecation policy. [semver standard]: https://doc.rust-lang.org/cargo/reference/semver.html [repository readme]: https://github.com/apache/arrow-rs From cbe176533b227f31f467b9c9b512d65a8406ce10 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 18 Dec 2024 05:57:53 -0800 Subject: [PATCH 10/11] Enable string-based column projections from Parquet files (#6871) * add function to create ProjectionMask from column names * add some more tests --- parquet/src/arrow/arrow_reader/mod.rs | 68 ++++++++++ parquet/src/arrow/mod.rs | 178 +++++++++++++++++++++++++- parquet/src/arrow/schema/mod.rs | 11 ++ 3 files changed, 256 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 378884a1c430..6eba04c86f91 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -989,6 +989,21 @@ mod tests { assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]); } + #[test] + fn test_arrow_reader_single_column_by_name() { + let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let original_schema = Arc::clone(builder.schema()); + + let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]); + let reader = builder.with_projection(mask).build().unwrap(); + + // Verify that the schema was correctly parsed + assert_eq!(1, reader.schema().fields().len()); + assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]); + } + #[test] fn test_null_column_reader_test() { let mut file = tempfile::tempfile().unwrap(); @@ -2563,6 +2578,59 @@ mod tests { } } + #[test] + // same as test_read_structs but constructs projection mask via column names + fn test_read_structs_by_name() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/nested_structs.rust.parquet"); + let file = File::open(&path).unwrap(); + let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap(); + + for batch in record_batch_reader { + batch.unwrap(); + } + + let file = File::open(&path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + let mask = ProjectionMask::columns( + builder.parquet_schema(), + ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"], + ); + let projected_reader = builder + .with_projection(mask) + .with_batch_size(60) + .build() + .unwrap(); + + let expected_schema = Schema::new(vec![ + Field::new( + "roll_num", + ArrowDataType::Struct(Fields::from(vec![Field::new( + "count", + ArrowDataType::UInt64, + false, + )])), + false, + ), + Field::new( + "PC_CUR", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("mean", ArrowDataType::Int64, false), + Field::new("sum", ArrowDataType::Int64, false), + ])), + false, + ), + ]); + + assert_eq!(&expected_schema, projected_reader.schema().as_ref()); + + for batch in projected_reader { + let batch = batch.unwrap(); + assert_eq!(batch.schema().as_ref(), &expected_schema); + } + } + #[test] fn test_read_maps() { let testdata = arrow::util::test_util::parquet_test_data(); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index d77436bc1ff7..6777e00fb05c 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -108,12 +108,14 @@ pub mod async_writer; mod record_reader; experimental!(mod schema); +use std::sync::Arc; + pub use self::arrow_writer::ArrowWriter; #[cfg(feature = "async")] pub use self::async_reader::ParquetRecordBatchStreamBuilder; #[cfg(feature = "async")] pub use self::async_writer::AsyncArrowWriter; -use crate::schema::types::SchemaDescriptor; +use crate::schema::types::{SchemaDescriptor, Type}; use arrow_schema::{FieldRef, Schema}; // continue to export deprecated methods until they are removed @@ -210,6 +212,71 @@ impl ProjectionMask { Self { mask: Some(mask) } } + // Given a starting point in the schema, do a DFS for that node adding leaf paths to `paths`. + fn find_leaves(root: &Arc, parent: Option<&String>, paths: &mut Vec) { + let path = parent + .map(|p| [p, root.name()].join(".")) + .unwrap_or(root.name().to_string()); + if root.is_group() { + for child in root.get_fields() { + Self::find_leaves(child, Some(&path), paths); + } + } else { + // Reached a leaf, add to paths + paths.push(path); + } + } + + /// Create a [`ProjectionMask`] which selects only the named columns + /// + /// All leaf columns that fall below a given name will be selected. For example, given + /// the schema + /// ```ignore + /// message schema { + /// OPTIONAL group a (MAP) { + /// REPEATED group key_value { + /// REQUIRED BYTE_ARRAY key (UTF8); // leaf index 0 + /// OPTIONAL group value (MAP) { + /// REPEATED group key_value { + /// REQUIRED INT32 key; // leaf index 1 + /// REQUIRED BOOLEAN value; // leaf index 2 + /// } + /// } + /// } + /// } + /// REQUIRED INT32 b; // leaf index 3 + /// REQUIRED DOUBLE c; // leaf index 4 + /// } + /// ``` + /// `["a.key_value.value", "c"]` would return leaf columns 1, 2, and 4. `["a"]` would return + /// columns 0, 1, and 2. + /// + /// Note: repeated or out of order indices will not impact the final mask. + /// + /// i.e. `["b", "c"]` will construct the same mask as `["c", "b", "c"]`. + pub fn columns<'a>( + schema: &SchemaDescriptor, + names: impl IntoIterator, + ) -> Self { + // first make vector of paths for leaf columns + let mut paths: Vec = vec![]; + for root in schema.root_schema().get_fields() { + Self::find_leaves(root, None, &mut paths); + } + assert_eq!(paths.len(), schema.num_columns()); + + let mut mask = vec![false; schema.num_columns()]; + for name in names { + for idx in 0..schema.num_columns() { + if paths[idx].starts_with(name) { + mask[idx] = true; + } + } + } + + Self { mask: Some(mask) } + } + /// Returns true if the leaf column `leaf_idx` is included by the mask pub fn leaf_included(&self, leaf_idx: usize) -> bool { self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true) @@ -246,10 +313,14 @@ mod test { use crate::arrow::ArrowWriter; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; use crate::file::properties::{EnabledStatistics, WriterProperties}; + use crate::schema::parser::parse_message_type; + use crate::schema::types::SchemaDescriptor; use arrow_array::{ArrayRef, Int32Array, RecordBatch}; use bytes::Bytes; use std::sync::Arc; + use super::ProjectionMask; + #[test] // Reproducer for https://github.com/apache/arrow-rs/issues/6464 fn test_metadata_read_write_partial_offset() { @@ -375,4 +446,109 @@ mod test { .unwrap(); Bytes::from(buf) } + + #[test] + fn test_mask_from_column_names() { + let message_type = " + message test_schema { + OPTIONAL group a (MAP) { + REPEATED group key_value { + REQUIRED BYTE_ARRAY key (UTF8); + OPTIONAL group value (MAP) { + REPEATED group key_value { + REQUIRED INT32 key; + REQUIRED BOOLEAN value; + } + } + } + } + REQUIRED INT32 b; + REQUIRED DOUBLE c; + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + + let mask = ProjectionMask::columns(&schema, ["foo", "bar"]); + assert_eq!(mask.mask.unwrap(), vec![false; 5]); + + let mask = ProjectionMask::columns(&schema, []); + assert_eq!(mask.mask.unwrap(), vec![false; 5]); + + let mask = ProjectionMask::columns(&schema, ["a", "c"]); + assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]); + + let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]); + assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]); + + let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]); + assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]); + + let message_type = " + message test_schema { + OPTIONAL group a (LIST) { + REPEATED group list { + OPTIONAL group element (LIST) { + REPEATED group list { + OPTIONAL group element (LIST) { + REPEATED group list { + OPTIONAL BYTE_ARRAY element (UTF8); + } + } + } + } + } + } + REQUIRED INT32 b; + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + + let mask = ProjectionMask::columns(&schema, ["a", "b"]); + assert_eq!(mask.mask.unwrap(), [true, true]); + + let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]); + assert_eq!(mask.mask.unwrap(), [true, true]); + + let mask = + ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]); + assert_eq!(mask.mask.unwrap(), [true, true]); + + let mask = ProjectionMask::columns(&schema, ["b"]); + assert_eq!(mask.mask.unwrap(), [false, true]); + + let message_type = " + message test_schema { + OPTIONAL INT32 a; + OPTIONAL INT32 b; + OPTIONAL INT32 c; + OPTIONAL INT32 d; + OPTIONAL INT32 e; + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + + let mask = ProjectionMask::columns(&schema, ["a", "b"]); + assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]); + + let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]); + assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]); + + let message_type = " + message test_schema { + OPTIONAL INT32 a; + OPTIONAL INT32 b; + OPTIONAL INT32 a; + OPTIONAL INT32 d; + OPTIONAL INT32 e; + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + + let mask = ProjectionMask::columns(&schema, ["a", "e"]); + assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]); + } } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 5d3d7b2a6541..212dec525833 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -1399,6 +1399,17 @@ mod tests { for i in 0..arrow_fields.len() { assert_eq!(&arrow_fields[i], converted_fields[i].as_ref()); } + + let mask = + ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]); + let converted_arrow_schema = + parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap(); + let converted_fields = converted_arrow_schema.fields(); + + assert_eq!(arrow_fields.len(), converted_fields.len()); + for i in 0..arrow_fields.len() { + assert_eq!(&arrow_fields[i], converted_fields[i].as_ref()); + } } #[test] From fc814bca6b743010bcde972e611c8ff8ea68c9f5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:58:19 +0800 Subject: [PATCH 11/11] doc: add comment for timezone string (#6899) * doc: add comment for timezone string Signed-off-by: xxchan * Update arrow-schema/src/datatype.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --------- Signed-off-by: xxchan Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-schema/src/datatype.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index a6333c804805..7cd53b13c73e 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -196,6 +196,14 @@ pub enum DataType { /// DataType::Timestamp(TimeUnit::Second, Some("literal".into())); /// DataType::Timestamp(TimeUnit::Second, Some("string".to_string().into())); /// ``` + /// + /// Timezone string parsing + /// ----------------------- + /// When feature `chrono-tz` is not enabled, allowed timezone strings are fixed offsets of the form "+09:00", "-09" or "+0930". + /// + /// When feature `chrono-tz` is enabled, additional strings supported by [chrono_tz](https://docs.rs/chrono-tz/latest/chrono_tz/) + /// are also allowed, which include [IANA database](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) + /// timezones. Timestamp(TimeUnit, Option>), /// A signed 32-bit date representing the elapsed time since UNIX epoch (1970-01-01) /// in days.