From 8c956a9f9ab26c14072740cce64c2b99cb039b13 Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:29:18 +0300 Subject: [PATCH] Support writing UTC adjusted time arrays to parquet (#6278) * check if time is adjusted to utc from metadata * add test * add roundtrip test * cargo fmt * Fix regression --------- Co-authored-by: Andrew Lamb --- parquet/src/arrow/arrow_reader/mod.rs | 69 ++++++++++++++++++++++++++- parquet/src/arrow/schema/mod.rs | 10 +++- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a0302fa86b3b..253625117c2b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -932,7 +932,7 @@ pub(crate) fn evaluate_predicate( #[cfg(test)] mod tests { use std::cmp::min; - use std::collections::VecDeque; + use std::collections::{HashMap, VecDeque}; use std::fmt::Formatter; use std::fs::File; use std::io::Seek; @@ -949,11 +949,14 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::types::{ Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type, + Time32MillisecondType, Time64MicrosecondType, }; use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; use arrow_data::ArrayDataBuilder; - use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef}; + use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit, + }; use arrow_select::concat::concat_batches; use crate::arrow::arrow_reader::{ @@ -1223,6 +1226,68 @@ mod tests { Ok(()) } + #[test] + fn test_time_utc_roundtrip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "time_millis", + ArrowDataType::Time32(TimeUnit::Millisecond), + true, + ) + .with_metadata(HashMap::from_iter(vec![( + "adjusted_to_utc".to_string(), + "".to_string(), + )])), + Field::new( + "time_micros", + ArrowDataType::Time64(TimeUnit::Microsecond), + true, + ) + .with_metadata(HashMap::from_iter(vec![( + "adjusted_to_utc".to_string(), + "".to_string(), + )])), + ])); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?; + + let original = RecordBatch::try_new( + schema, + vec![ + Arc::new(Time32MillisecondArray::from(vec![ + Some(-1), + Some(0), + Some(86_399_000), + Some(86_400_000), + Some(86_401_000), + None, + ])), + Arc::new(Time64MicrosecondArray::from(vec![ + Some(-1), + Some(0), + Some(86_399 * 1_000_000), + Some(86_400 * 1_000_000), + Some(86_401 * 1_000_000), + None, + ])), + ], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + assert_eq!(ret, original); + + // Ensure can be downcast to the correct type + ret.column(0).as_primitive::(); + ret.column(1).as_primitive::(); + + Ok(()) + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 8c583eebac5b..a3528b6c8adb 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -427,7 +427,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Time { - is_adjusted_to_u_t_c: false, + is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"), unit: match unit { TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), u => unreachable!("Invalid unit for Time32: {:?}", u), @@ -438,7 +438,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .build(), DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64) .with_logical_type(Some(LogicalType::Time { - is_adjusted_to_u_t_c: false, + is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"), unit: match unit { TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), @@ -1430,7 +1430,9 @@ mod tests { } OPTIONAL INT32 date (DATE); OPTIONAL INT32 time_milli (TIME(MILLIS,false)); + OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true)); OPTIONAL INT64 time_micro (TIME_MICROS); + OPTIONAL INT64 time_micro_utc (TIME(MICROS, true)); OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS); REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false)); REQUIRED INT64 ts_seconds; @@ -1481,7 +1483,11 @@ mod tests { ), Field::new("date", DataType::Date32, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), + Field::new("time_milli_utc", DataType::Time32(TimeUnit::Millisecond), true) + .with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), "".to_string())])), Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true), + Field::new("time_micro_utc", DataType::Time64(TimeUnit::Microsecond), true) + .with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), "".to_string())])), Field::new( "ts_milli", DataType::Timestamp(TimeUnit::Millisecond, None),