From 4cd942aded9f9134d9056148776d6d0975f30907 Mon Sep 17 00:00:00 2001 From: xmakro Date: Thu, 22 Aug 2024 16:25:07 -0700 Subject: [PATCH 01/15] Parquet: Verify 32-bit CRC checksum when decoding pages --- parquet/Cargo.toml | 155 ++++++++++++++++++-------- parquet/src/errors.rs | 2 + parquet/src/file/serialized_reader.rs | 9 ++ 3 files changed, 120 insertions(+), 46 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index b97b2a571646..91fdc0246548 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -16,23 +16,27 @@ # under the License. [package] -name = "parquet" -version = { workspace = true } -license = { workspace = true } +authors = { workspace = true } description = "Apache Parquet implementation in Rust" +edition = { workspace = true } homepage = { workspace = true } -repository = { workspace = true } -authors = { workspace = true } keywords = ["arrow", "parquet", "hadoop"] +license = { workspace = true } +name = "parquet" readme = "README.md" -edition = { workspace = true } +repository = { workspace = true } rust-version = "1.70.0" +version = { workspace = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } +ahash = { version = "0.8", default-features = false, features = [ + "compile-time-rng", +] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } [dependencies] arrow-array = { workspace = true, optional = true } @@ -40,49 +44,98 @@ arrow-buffer = { workspace = true, optional = true } arrow-cast = { workspace = true, optional = true } arrow-csv = { workspace = true, optional = true } arrow-data = { workspace = true, optional = true } +arrow-ipc = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } -arrow-ipc = { workspace = true, optional = true } # Intentionally not a path dependency as object_store is released separately object_store = { version = "0.11.0", default-features = false, optional = true } +base64 = { version = "0.22", default-features = false, features = [ + "std", +], optional = true } +brotli = { version = "6.0", default-features = false, features = [ + "std", +], optional = true } bytes = { version = "1.1", default-features = false, features = ["std"] } -thrift = { version = "0.17", default-features = false } -snap = { version = "1.0", default-features = false, optional = true } -brotli = { version = "6.0", default-features = false, features = ["std"], optional = true } -flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } -zstd = { version = "0.13", optional = true, default-features = false } chrono = { workspace = true } +clap = { version = "4.1", default-features = false, features = [ + "std", + "derive", + "env", + "help", + "error-context", + "usage", +], optional = true } +crc32fast = { version = "1.4", optional = true, default-features = false } +flate2 = { version = "1.0", default-features = false, features = [ + "rust_backend", +], optional = true } +futures = { version = "0.3", default-features = false, features = [ + "std", +], optional = true } +half = { version = "2.1", default-features = false, features = ["num-traits"] } +hashbrown = { version = "0.14", default-features = false } +lz4_flex = { version = "0.11", default-features = false, features = [ + "std", + "frame", +], optional = true } num = { version = "0.4", default-features = false } num-bigint = { version = "0.4", default-features = false } -base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true } -clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true } -serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } -serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } +paste = { version = "1.0" } seq-macro = { version = "0.3", default-features = false } -futures = { version = "0.3", default-features = false, features = ["std"], optional = true } -tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } -hashbrown = { version = "0.14", default-features = false } +serde = { version = "1.0", default-features = false, features = [ + "derive", +], optional = true } +serde_json = { version = "1.0", default-features = false, features = [ + "std", +], optional = true } +snap = { version = "1.0", default-features = false, optional = true } +sysinfo = { version = "0.31.2", optional = true, default-features = false, features = [ + "system", +] } +thrift = { version = "0.17", default-features = false } +tokio = { version = "1.0", optional = true, default-features = false, features = [ + "macros", + "rt", + "io-util", +] } twox-hash = { version = "1.6", default-features = false } -paste = { version = "1.0" } -half = { version = "2.1", default-features = false, features = ["num-traits"] } -sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } +zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] +arrow = { workspace = true, features = [ + "ipc", + "test_utils", + "prettyprint", + "json", +] } base64 = { version = "0.22", default-features = false, features = ["std"] } +brotli = { version = "6.0", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false } +flate2 = { version = "1.0", default-features = false, features = [ + "rust_backend", +] } +lz4_flex = { version = "0.11", default-features = false, features = [ + "std", + "frame", +] } +object_store = { version = "0.11.0", default-features = false, features = [ + "azure", +] } +rand = { version = "0.8", default-features = false, features = [ + "std", + "std_rng", +] } +serde_json = { version = "1.0", features = ["std"], default-features = false } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } -brotli = { version = "6.0", default-features = false, features = ["std"] } -flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt", + "io-util", + "fs", +] } zstd = { version = "0.13", default-features = false } -serde_json = { version = "1.0", features = ["std"], default-features = false } -arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } -tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } -rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } -object_store = { version = "0.11.0", default-features = false, features = ["azure"] } # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 @@ -100,7 +153,16 @@ default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs -arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] +arrow = [ + "base64", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "arrow-select", + "arrow-ipc", +] # Enable CLI tools cli = ["json", "base64", "clap", "arrow-csv", "serde"] # Enable JSON APIs @@ -117,26 +179,28 @@ object_store = ["dep:object_store", "async"] zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] +# Verify 32-bit CRC checksum when decoding parquet pages +crc = ["crc32fast"] [[example]] name = "read_parquet" -required-features = ["arrow"] path = "./examples/read_parquet.rs" +required-features = ["arrow"] [[example]] name = "write_parquet" -required-features = ["cli", "sysinfo"] path = "./examples/write_parquet.rs" +required-features = ["cli", "sysinfo"] [[example]] name = "async_read_parquet" -required-features = ["arrow", "async"] path = "./examples/async_read_parquet.rs" +required-features = ["arrow", "async"] [[example]] name = "read_with_rowgroup" -required-features = ["arrow", "async"] path = "./examples/read_with_rowgroup.rs" +required-features = ["arrow", "async"] [[test]] name = "arrow_writer_layout" @@ -144,8 +208,8 @@ required-features = ["arrow"] [[test]] name = "arrow_reader" -required-features = ["arrow"] path = "./tests/arrow_reader/mod.rs" +required-features = ["arrow"] [[bin]] name = "parquet-read" @@ -184,34 +248,33 @@ name = "parquet-index" required-features = ["cli"] [[bench]] +harness = false name = "arrow_writer" required-features = ["arrow"] -harness = false [[bench]] +harness = false name = "arrow_reader" required-features = ["arrow", "test_common", "experimental"] -harness = false [[bench]] +harness = false name = "arrow_statistics" required-features = ["arrow"] -harness = false - [[bench]] +harness = false name = "compression" required-features = ["experimental", "default"] -harness = false [[bench]] +harness = false name = "encoding" required-features = ["experimental", "default"] -harness = false [[bench]] -name = "metadata" harness = false +name = "metadata" [lib] bench = false diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index a242c9768514..cddfe9ecd0fb 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -43,6 +43,7 @@ pub enum ParquetError { /// Returned when reading into arrow or writing from arrow. ArrowError(String), IndexOutOfBound(usize, usize), + Crc32Mismatch, /// An external error variant External(Box), } @@ -60,6 +61,7 @@ impl std::fmt::Display for ParquetError { ParquetError::IndexOutOfBound(index, ref bound) => { write!(fmt, "Index {index} out of bound: {bound}") } + ParquetError::Crc32Mismatch => write!(fmt, "Parquet Page crc32 mismatch"), ParquetError::External(e) => write!(fmt, "External: {e}"), } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0a3e51931867..375b0049828f 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -391,6 +391,15 @@ pub(crate) fn decode_page( physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { + // Verify the 32-bit CRC checksum of the page + #[cfg(feature = "crc")] + if let Some(expected_crc) = page_header.crc { + let crc = crc32fast::hash(&buffer); + if crc != expected_crc as u32 { + return Err(ParquetError::Crc32Mismatch); + } + } + // When processing data page v2, depending on enabled compression for the // page, we should account for uncompressed data ('offset') of // repetition and definition levels. From 7ef3b72eba44fa842c8b396b7a975f375529b841 Mon Sep 17 00:00:00 2001 From: xmakro Date: Thu, 22 Aug 2024 16:26:22 -0700 Subject: [PATCH 02/15] Undo cargo toml --- parquet/Cargo.toml | 154 ++++++++++++++------------------------------- 1 file changed, 47 insertions(+), 107 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 91fdc0246548..8062f56dbfab 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -16,27 +16,23 @@ # under the License. [package] -authors = { workspace = true } +name = "parquet" +version = { workspace = true } +license = { workspace = true } description = "Apache Parquet implementation in Rust" -edition = { workspace = true } homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } keywords = ["arrow", "parquet", "hadoop"] -license = { workspace = true } -name = "parquet" readme = "README.md" -repository = { workspace = true } +edition = { workspace = true } rust-version = "1.70.0" -version = { workspace = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "compile-time-rng", -] } +ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } [dependencies] arrow-array = { workspace = true, optional = true } @@ -44,98 +40,49 @@ arrow-buffer = { workspace = true, optional = true } arrow-cast = { workspace = true, optional = true } arrow-csv = { workspace = true, optional = true } arrow-data = { workspace = true, optional = true } -arrow-ipc = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } +arrow-ipc = { workspace = true, optional = true } # Intentionally not a path dependency as object_store is released separately object_store = { version = "0.11.0", default-features = false, optional = true } -base64 = { version = "0.22", default-features = false, features = [ - "std", -], optional = true } -brotli = { version = "6.0", default-features = false, features = [ - "std", -], optional = true } bytes = { version = "1.1", default-features = false, features = ["std"] } +thrift = { version = "0.17", default-features = false } +snap = { version = "1.0", default-features = false, optional = true } +brotli = { version = "6.0", default-features = false, features = ["std"], optional = true } +flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } +lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } +zstd = { version = "0.13", optional = true, default-features = false } chrono = { workspace = true } -clap = { version = "4.1", default-features = false, features = [ - "std", - "derive", - "env", - "help", - "error-context", - "usage", -], optional = true } -crc32fast = { version = "1.4", optional = true, default-features = false } -flate2 = { version = "1.0", default-features = false, features = [ - "rust_backend", -], optional = true } -futures = { version = "0.3", default-features = false, features = [ - "std", -], optional = true } -half = { version = "2.1", default-features = false, features = ["num-traits"] } -hashbrown = { version = "0.14", default-features = false } -lz4_flex = { version = "0.11", default-features = false, features = [ - "std", - "frame", -], optional = true } num = { version = "0.4", default-features = false } num-bigint = { version = "0.4", default-features = false } -paste = { version = "1.0" } +base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true } +clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true } +serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } +serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } seq-macro = { version = "0.3", default-features = false } -serde = { version = "1.0", default-features = false, features = [ - "derive", -], optional = true } -serde_json = { version = "1.0", default-features = false, features = [ - "std", -], optional = true } -snap = { version = "1.0", default-features = false, optional = true } -sysinfo = { version = "0.31.2", optional = true, default-features = false, features = [ - "system", -] } -thrift = { version = "0.17", default-features = false } -tokio = { version = "1.0", optional = true, default-features = false, features = [ - "macros", - "rt", - "io-util", -] } +futures = { version = "0.3", default-features = false, features = ["std"], optional = true } +tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } +hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } -zstd = { version = "0.13", optional = true, default-features = false } +paste = { version = "1.0" } +half = { version = "2.1", default-features = false, features = ["num-traits"] } +sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } [dev-dependencies] -arrow = { workspace = true, features = [ - "ipc", - "test_utils", - "prettyprint", - "json", -] } base64 = { version = "0.22", default-features = false, features = ["std"] } -brotli = { version = "6.0", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false } -flate2 = { version = "1.0", default-features = false, features = [ - "rust_backend", -] } -lz4_flex = { version = "0.11", default-features = false, features = [ - "std", - "frame", -] } -object_store = { version = "0.11.0", default-features = false, features = [ - "azure", -] } -rand = { version = "0.8", default-features = false, features = [ - "std", - "std_rng", -] } -serde_json = { version = "1.0", features = ["std"], default-features = false } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } -tokio = { version = "1.0", default-features = false, features = [ - "macros", - "rt", - "io-util", - "fs", -] } +brotli = { version = "6.0", default-features = false, features = ["std"] } +flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } +lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } zstd = { version = "0.13", default-features = false } +serde_json = { version = "1.0", features = ["std"], default-features = false } +arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } +tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } +object_store = { version = "0.11.0", default-features = false, features = ["azure"] } # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 @@ -153,16 +100,7 @@ default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs -arrow = [ - "base64", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", - "arrow-select", - "arrow-ipc", -] +arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] # Enable CLI tools cli = ["json", "base64", "clap", "arrow-csv", "serde"] # Enable JSON APIs @@ -184,23 +122,23 @@ crc = ["crc32fast"] [[example]] name = "read_parquet" -path = "./examples/read_parquet.rs" required-features = ["arrow"] +path = "./examples/read_parquet.rs" [[example]] name = "write_parquet" -path = "./examples/write_parquet.rs" required-features = ["cli", "sysinfo"] +path = "./examples/write_parquet.rs" [[example]] name = "async_read_parquet" -path = "./examples/async_read_parquet.rs" required-features = ["arrow", "async"] +path = "./examples/async_read_parquet.rs" [[example]] name = "read_with_rowgroup" -path = "./examples/read_with_rowgroup.rs" required-features = ["arrow", "async"] +path = "./examples/read_with_rowgroup.rs" [[test]] name = "arrow_writer_layout" @@ -208,8 +146,8 @@ required-features = ["arrow"] [[test]] name = "arrow_reader" -path = "./tests/arrow_reader/mod.rs" required-features = ["arrow"] +path = "./tests/arrow_reader/mod.rs" [[bin]] name = "parquet-read" @@ -248,33 +186,35 @@ name = "parquet-index" required-features = ["cli"] [[bench]] -harness = false name = "arrow_writer" required-features = ["arrow"] +harness = false [[bench]] -harness = false name = "arrow_reader" required-features = ["arrow", "test_common", "experimental"] +harness = false [[bench]] -harness = false name = "arrow_statistics" required-features = ["arrow"] +harness = false + [[bench]] -harness = false name = "compression" required-features = ["experimental", "default"] +harness = false [[bench]] -harness = false name = "encoding" required-features = ["experimental", "default"] +harness = false [[bench]] -harness = false name = "metadata" +harness = false [lib] bench = false + From d64e8318437ddd82cdb2aecd7642d0ffb0989d45 Mon Sep 17 00:00:00 2001 From: xmakro Date: Thu, 22 Aug 2024 16:29:16 -0700 Subject: [PATCH 03/15] a --- parquet/src/errors.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index cddfe9ecd0fb..a727f33cb7b5 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -61,7 +61,9 @@ impl std::fmt::Display for ParquetError { ParquetError::IndexOutOfBound(index, ref bound) => { write!(fmt, "Index {index} out of bound: {bound}") } - ParquetError::Crc32Mismatch => write!(fmt, "Parquet Page crc32 mismatch"), + ParquetError::Crc32Mismatch => { + write!(fmt, "Page CRC checksum mismatch") + } ParquetError::External(e) => write!(fmt, "External: {e}"), } } From 5ee97bd47ada17b2068127246ac799fa519fff58 Mon Sep 17 00:00:00 2001 From: xmakro Date: Thu, 22 Aug 2024 16:29:59 -0700 Subject: [PATCH 04/15] enable crc by default --- parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 8062f56dbfab..273a5ecdb47b 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -96,7 +96,7 @@ zstd-sys = { version = ">=2.0.0, <2.0.14", default-features = false } all-features = true [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64", "crc"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs From 52bdce43ae816de32cdc7f257ad8a1a563a5c9a5 Mon Sep 17 00:00:00 2001 From: xmakro Date: Thu, 22 Aug 2024 16:35:44 -0700 Subject: [PATCH 05/15] a --- parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 273a5ecdb47b..9a27ef4fcd83 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -118,7 +118,7 @@ zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] # Verify 32-bit CRC checksum when decoding parquet pages -crc = ["crc32fast"] +crc = ["dep:crc32fast"] [[example]] name = "read_parquet" From 48b9241f95a230fead80976ee458418789e8a74e Mon Sep 17 00:00:00 2001 From: xmakro Date: Sun, 25 Aug 2024 19:54:50 -0700 Subject: [PATCH 06/15] Address comments --- parquet/Cargo.toml | 3 ++- parquet/src/errors.rs | 4 ---- parquet/src/file/serialized_reader.rs | 4 +++- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 9a27ef4fcd83..001750207bc4 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } +crc32fast = { version = "1.4.2", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -96,7 +97,7 @@ zstd-sys = { version = ">=2.0.0, <2.0.14", default-features = false } all-features = true [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64", "crc"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index a727f33cb7b5..a242c9768514 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -43,7 +43,6 @@ pub enum ParquetError { /// Returned when reading into arrow or writing from arrow. ArrowError(String), IndexOutOfBound(usize, usize), - Crc32Mismatch, /// An external error variant External(Box), } @@ -61,9 +60,6 @@ impl std::fmt::Display for ParquetError { ParquetError::IndexOutOfBound(index, ref bound) => { write!(fmt, "Index {index} out of bound: {bound}") } - ParquetError::Crc32Mismatch => { - write!(fmt, "Page CRC checksum mismatch") - } ParquetError::External(e) => write!(fmt, "External: {e}"), } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 375b0049828f..c6b123ffdcb3 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -396,7 +396,9 @@ pub(crate) fn decode_page( if let Some(expected_crc) = page_header.crc { let crc = crc32fast::hash(&buffer); if crc != expected_crc as u32 { - return Err(ParquetError::Crc32Mismatch); + return Err(ParquetError::General( + "Page CRC checksum mismatch".to_string(), + )); } } From da633e4ece12f51e74bf247c4120385acf5a6f7a Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 31 Aug 2024 21:30:35 -0700 Subject: [PATCH 07/15] Add tests that verify crc checks --- parquet/tests/arrow_reader/checksum.rs | 55 ++++++++++++++++++++++++++ parquet/tests/arrow_reader/mod.rs | 2 + 2 files changed, 57 insertions(+) create mode 100644 parquet/tests/arrow_reader/checksum.rs diff --git a/parquet/tests/arrow_reader/checksum.rs b/parquet/tests/arrow_reader/checksum.rs new file mode 100644 index 000000000000..afde91eb4498 --- /dev/null +++ b/parquet/tests/arrow_reader/checksum.rs @@ -0,0 +1,55 @@ +use std::path::PathBuf; + +use arrow::util::test_util::parquet_test_data; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; + +#[test] +fn test_datapage_v1_corrupt_checksum() { + let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet"); + assert_eq!(errors, [ + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Ok(()), + Ok(()), + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string()) + ]); +} + +#[test] +fn test_datapage_v1_uncompressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + +#[test] +fn test_datapage_v1_snappy_compressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + + +#[test] +fn test_plain_dict_uncompressed_checksum() { + let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} +#[test] +fn test_rle_dict_snappy_checksum() { + let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} + +/// Reads a file and returns a vector with one element per record batch. +/// The record batch data is replaced with () and errors are stringified. +fn read_file_batch_errors(name: &str) -> Vec> { + let path = PathBuf::from(parquet_test_data()).join(name); + println!("Reading file: {:?}", path); + let file = std::fs::File::open(&path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap(); + reader + .map(|x| match x { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + }) + .collect() +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index cc4c8f3c977b..0e6783583cd5 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -36,6 +36,8 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod bad_data; +#[cfg(feature = "crc")] +mod checksum; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values From e78271d77cf337840ae5faf8ab9c7a51a175a9b5 Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 31 Aug 2024 21:30:43 -0700 Subject: [PATCH 08/15] Document feature flag --- parquet/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/README.md b/parquet/README.md index 0360d15db14f..3591894e5c85 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -55,6 +55,7 @@ The `parquet` crate provides the following features which may be enabled in your - `async` - support `async` APIs for reading parquet - `json` - support for reading / writing `json` data to / from parquet - `brotli` (default) - support for parquet using `brotli` compression +- `crc` - verifies checksums when reading data pages - `flate2` (default) - support for parquet using `gzip` compression - `lz4` (default) - support for parquet using `lz4` compression - `zstd` (default) - support for parquet using `zstd` compression @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your ## License -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. +Licensed under the Apache License, Version 2.0: . From 2592b17dcb0544bf7f643d9be0f6c26ebaffdd77 Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 31 Aug 2024 21:31:22 -0700 Subject: [PATCH 09/15] Move documentation around --- parquet/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/README.md b/parquet/README.md index 3591894e5c85..0d55a255f030 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -55,12 +55,12 @@ The `parquet` crate provides the following features which may be enabled in your - `async` - support `async` APIs for reading parquet - `json` - support for reading / writing `json` data to / from parquet - `brotli` (default) - support for parquet using `brotli` compression -- `crc` - verifies checksums when reading data pages - `flate2` (default) - support for parquet using `gzip` compression - `lz4` (default) - support for parquet using `lz4` compression - `zstd` (default) - support for parquet using `zstd` compression - `snap` (default) - support for parquet using `snappy` compression - `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin) +- `crc` - verifies checksums when reading data pages - `experimental` - Experimental APIs which may change, even between minor releases ## Parquet Feature Status From ee6e4c323a0dccdbc5d67dcc4210ef42cc852576 Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Sat, 21 Sep 2024 00:25:17 -0700 Subject: [PATCH 10/15] Update parquet/Cargo.toml Co-authored-by: Ed Seidl --- parquet/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 001750207bc4..1d38e67a0f02 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -218,4 +218,3 @@ harness = false [lib] bench = false - From 82c49efc9d0bb7f12c404910a3937e01b11a55bd Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Sat, 21 Sep 2024 00:25:25 -0700 Subject: [PATCH 11/15] Update parquet/src/file/serialized_reader.rs Co-authored-by: Ed Seidl --- parquet/src/file/serialized_reader.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index c6b123ffdcb3..ad857ee86d68 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -396,9 +396,7 @@ pub(crate) fn decode_page( if let Some(expected_crc) = page_header.crc { let crc = crc32fast::hash(&buffer); if crc != expected_crc as u32 { - return Err(ParquetError::General( - "Page CRC checksum mismatch".to_string(), - )); + return Err(general_err!("Page CRC checksum mismatch")); } } From 32f063a8d07992b3533346f13432b5aa4f841387 Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 21 Sep 2024 00:27:26 -0700 Subject: [PATCH 12/15] Add license --- parquet/tests/arrow_reader/checksum.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/checksum.rs b/parquet/tests/arrow_reader/checksum.rs index afde91eb4498..50f7895073c4 100644 --- a/parquet/tests/arrow_reader/checksum.rs +++ b/parquet/tests/arrow_reader/checksum.rs @@ -1,3 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file contains an end to end test for verifying checksums when reading parquet files. + use std::path::PathBuf; use arrow::util::test_util::parquet_test_data; @@ -27,7 +46,6 @@ fn test_datapage_v1_snappy_compressed_checksum() { assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); } - #[test] fn test_plain_dict_uncompressed_checksum() { let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet"); From a76910cc741c526d62a6995d042933fbb78b5c10 Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 21 Sep 2024 00:27:53 -0700 Subject: [PATCH 13/15] Run cargo +stable fmt --all --- parquet/tests/arrow_reader/checksum.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/tests/arrow_reader/checksum.rs b/parquet/tests/arrow_reader/checksum.rs index 50f7895073c4..c60908d8b95d 100644 --- a/parquet/tests/arrow_reader/checksum.rs +++ b/parquet/tests/arrow_reader/checksum.rs @@ -27,8 +27,8 @@ fn test_datapage_v1_corrupt_checksum() { let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet"); assert_eq!(errors, [ Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), - Ok(()), - Ok(()), + Ok(()), + Ok(()), Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string()) ]); From 6fecf131a02459474afcc4f6a38865f7d3539d28 Mon Sep 17 00:00:00 2001 From: xmakro Date: Sat, 21 Sep 2024 00:29:10 -0700 Subject: [PATCH 14/15] Revert MD034 --- parquet/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/README.md b/parquet/README.md index 0d55a255f030..233406ac2f54 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -83,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your ## License -Licensed under the Apache License, Version 2.0: . +Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. From 581e757c91a55cd8fe07d1ecc3e9718a6b30f233 Mon Sep 17 00:00:00 2001 From: xmakro Date: Fri, 27 Sep 2024 19:34:50 -0700 Subject: [PATCH 15/15] Applye readme suggestion --- parquet/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/README.md b/parquet/README.md index 233406ac2f54..a0441ee6026d 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -60,7 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your - `zstd` (default) - support for parquet using `zstd` compression - `snap` (default) - support for parquet using `snappy` compression - `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin) -- `crc` - verifies checksums when reading data pages +- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding - `experimental` - Experimental APIs which may change, even between minor releases ## Parquet Feature Status @@ -83,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your ## License -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. +Licensed under the Apache License, Version 2.0: .