From 8e0aaadb98a55152259b462ce90ac5f9307a0a41 Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Sat, 28 Sep 2024 01:41:02 -0700 Subject: [PATCH] Parquet: Verify 32-bit CRC checksum when decoding pages (#6290) * Parquet: Verify 32-bit CRC checksum when decoding pages * Undo cargo toml * a * enable crc by default * a * Address comments * Add tests that verify crc checks * Document feature flag * Move documentation around * Update parquet/Cargo.toml Co-authored-by: Ed Seidl * Update parquet/src/file/serialized_reader.rs Co-authored-by: Ed Seidl * Add license * Run cargo +stable fmt --all * Revert MD034 * Applye readme suggestion --------- Co-authored-by: xmakro Co-authored-by: Ed Seidl --- parquet/Cargo.toml | 3 ++ parquet/README.md | 3 +- parquet/src/file/serialized_reader.rs | 9 ++++ parquet/tests/arrow_reader/checksum.rs | 73 ++++++++++++++++++++++++++ parquet/tests/arrow_reader/mod.rs | 2 + 5 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 parquet/tests/arrow_reader/checksum.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index b97b2a571646..1d38e67a0f02 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } +crc32fast = { version = "1.4.2", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"] zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] +# Verify 32-bit CRC checksum when decoding parquet pages +crc = ["dep:crc32fast"] [[example]] name = "read_parquet" diff --git a/parquet/README.md b/parquet/README.md index 0360d15db14f..a0441ee6026d 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your - `zstd` (default) - support for parquet using `zstd` compression - `snap` (default) - support for parquet using `snappy` compression - `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin) +- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding - `experimental` - Experimental APIs which may change, even between minor releases ## Parquet Feature Status @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your ## License -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. +Licensed under the Apache License, Version 2.0: . diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6fb0f78c1613..b253b73a4fa0 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -390,6 +390,15 @@ pub(crate) fn decode_page( physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { + // Verify the 32-bit CRC checksum of the page + #[cfg(feature = "crc")] + if let Some(expected_crc) = page_header.crc { + let crc = crc32fast::hash(&buffer); + if crc != expected_crc as u32 { + return Err(general_err!("Page CRC checksum mismatch")); + } + } + // When processing data page v2, depending on enabled compression for the // page, we should account for uncompressed data ('offset') of // repetition and definition levels. diff --git a/parquet/tests/arrow_reader/checksum.rs b/parquet/tests/arrow_reader/checksum.rs new file mode 100644 index 000000000000..c60908d8b95d --- /dev/null +++ b/parquet/tests/arrow_reader/checksum.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file contains an end to end test for verifying checksums when reading parquet files. + +use std::path::PathBuf; + +use arrow::util::test_util::parquet_test_data; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; + +#[test] +fn test_datapage_v1_corrupt_checksum() { + let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet"); + assert_eq!(errors, [ + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Ok(()), + Ok(()), + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string()) + ]); +} + +#[test] +fn test_datapage_v1_uncompressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + +#[test] +fn test_datapage_v1_snappy_compressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + +#[test] +fn test_plain_dict_uncompressed_checksum() { + let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} +#[test] +fn test_rle_dict_snappy_checksum() { + let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} + +/// Reads a file and returns a vector with one element per record batch. +/// The record batch data is replaced with () and errors are stringified. +fn read_file_batch_errors(name: &str) -> Vec> { + let path = PathBuf::from(parquet_test_data()).join(name); + println!("Reading file: {:?}", path); + let file = std::fs::File::open(&path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap(); + reader + .map(|x| match x { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + }) + .collect() +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index cc4c8f3c977b..0e6783583cd5 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -36,6 +36,8 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod bad_data; +#[cfg(feature = "crc")] +mod checksum; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values