From be6c27355b0313db6ed4769223fc3495b686cdff Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 12 Nov 2024 11:45:16 +0800 Subject: [PATCH] refactor(source): enhance parquet file source (#19221) --- e2e_test/s3/fs_sink.py | 63 ++++-- src/common/src/array/arrow/arrow_impl.rs | 184 ++++++++++++++++-- src/common/src/types/timestamptz.rs | 10 + src/connector/src/parser/parquet_parser.rs | 29 +-- .../opendal_source/opendal_reader.rs | 8 +- 5 files changed, 236 insertions(+), 58 deletions(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index 344b1b807d7e4..b068b479d8f83 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -27,8 +27,14 @@ def gen_data(file_num, item_num_per_file): 'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()), 'test_date': pa.scalar(datetime.now().date(), type=pa.date32()), 'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')), - 'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), - 'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')), + 'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')), + 'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), + 'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')), + 'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')), + 'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')), + 'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')), } for item_id in range(item_num_per_file)] for file_id in range(file_num) ] @@ -60,8 +66,15 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz + ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -128,8 +141,14 @@ def _table(): test_bytea, test_date, test_time, - test_timestamp, - test_timestamptz + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -147,7 +166,7 @@ def _table(): print('Sink into s3 in parquet encode...') # Execute a SELECT statement cur.execute(f'''CREATE TABLE test_parquet_sink_table( - id bigint primary key, + id bigint primary key,\ name TEXT, sex bigint, mark bigint, @@ -158,8 +177,14 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', match_pattern = 'test_parquet_sink/*.parquet', @@ -196,8 +221,14 @@ def _table(): test_bytea, test_date, test_time, - test_timestamp, - test_timestamptz + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -226,8 +257,14 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', match_pattern = 'test_json_sink/*.json', diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 3095461a2ebc5..2e35a1403781f 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,6 +42,7 @@ use std::fmt::Write; +use arrow_53_schema::TimeUnit; use arrow_array::array; use arrow_array::cast::AsArray; use arrow_buffer::OffsetBuffer; @@ -512,6 +513,12 @@ pub trait FromArrow { Time64(Microsecond) => DataType::Time, Timestamp(Microsecond, None) => DataType::Timestamp, Timestamp(Microsecond, Some(_)) => DataType::Timestamptz, + Timestamp(Second, None) => DataType::Timestamp, + Timestamp(Second, Some(_)) => DataType::Timestamptz, + Timestamp(Millisecond, None) => DataType::Timestamp, + Timestamp(Millisecond, Some(_)) => DataType::Timestamptz, + Timestamp(Nanosecond, None) => DataType::Timestamp, + Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz, Interval(MonthDayNano) => DataType::Interval, Utf8 => DataType::Varchar, Binary => DataType::Bytea, @@ -572,7 +579,6 @@ pub trait FromArrow { if let Some(type_name) = field.metadata().get("ARROW:extension:name") { return self.from_extension_array(type_name, array); } - match array.data_type() { Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()), Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()), @@ -584,12 +590,30 @@ pub trait FromArrow { Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()), Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()), + Timestamp(Second, None) => { + self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Second, Some(_)) => { + self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, None) => { + self.from_timestampms_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, Some(_)) => { + self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap()) + } Timestamp(Microsecond, None) => { self.from_timestampus_array(array.as_any().downcast_ref().unwrap()) } Timestamp(Microsecond, Some(_)) => { self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap()) } + Timestamp(Nanosecond, None) => { + self.from_timestampns_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Nanosecond, Some(_)) => { + self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap()) + } Interval(MonthDayNano) => { self.from_interval_array(array.as_any().downcast_ref().unwrap()) } @@ -692,6 +716,33 @@ pub trait FromArrow { Ok(ArrayImpl::Time(array.into())) } + fn from_timestampsecond_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + fn from_timestampsecond_some_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + + fn from_timestampms_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampms_some_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_timestampus_array( &self, array: &arrow_array::TimestampMicrosecondArray, @@ -706,6 +757,20 @@ pub trait FromArrow { Ok(ArrayImpl::Timestamptz(array.into())) } + fn from_timestampns_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampns_some_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_interval_array( &self, array: &arrow_array::IntervalMonthDayNanoArray, @@ -842,6 +907,44 @@ macro_rules! converts { } }; } + +macro_rules! converts_with_timeunit { + ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { + + impl From<&$ArrayType> for $ArrowType { + fn from(array: &$ArrayType) -> Self { + array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect() + } + } + + impl From<&$ArrowType> for $ArrayType { + fn from(array: &$ArrowType) -> Self { + array.iter().map(|o| { + o.map(|v| { + let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit); + timestamp + }) + }).collect() + } + } + + impl From<&[$ArrowType]> for $ArrayType { + fn from(arrays: &[$ArrowType]) -> Self { + arrays + .iter() + .flat_map(|a| a.iter()) + .map(|o| { + o.map(|v| { + <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit) + }) + }) + .collect() + } + } + + }; +} + converts!(BoolArray, arrow_array::BooleanArray); converts!(I16Array, arrow_array::Int16Array); converts!(I32Array, arrow_array::Int32Array); @@ -854,11 +957,19 @@ converts!(Utf8Array, arrow_array::StringArray); converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map); converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + /// Converts RisingWave value from and into Arrow value. trait FromIntoArrow { /// The corresponding element type in the Arrow array. @@ -867,6 +978,16 @@ trait FromIntoArrow { fn into_arrow(self) -> Self::ArrowType; } +/// Converts RisingWave value from and into Arrow value. +/// Specifically used for converting timestamp types according to timeunit. +trait FromIntoArrowWithUnit { + type ArrowType; + /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. + type TimestampType; + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; +} + impl FromIntoArrow for Serial { type ArrowType = i64; @@ -936,34 +1057,55 @@ impl FromIntoArrow for Time { } } -impl FromIntoArrow for Timestamp { +impl FromIntoArrowWithUnit for Timestamp { type ArrowType = i64; + type TimestampType = TimeUnit; - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamp( - DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) - .unwrap() - .naive_utc(), - ) + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => { + Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) + } + TimeUnit::Millisecond => { + Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) + } + TimeUnit::Microsecond => { + Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc()) + } + TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()), + } } - fn into_arrow(self) -> Self::ArrowType { - self.0 - .signed_duration_since(NaiveDateTime::default()) - .num_microseconds() - .unwrap() + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.0.and_utc().timestamp(), + TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(), + TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(), + TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(), + } } } -impl FromIntoArrow for Timestamptz { +impl FromIntoArrowWithUnit for Timestamptz { type ArrowType = i64; - - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamptz::from_micros(value) + type TimestampType = TimeUnit; + + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), + TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(), + TimeUnit::Microsecond => Timestamptz::from_micros(value), + TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(), + } } - fn into_arrow(self) -> Self::ArrowType { - self.timestamp_micros() + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.timestamp(), + TimeUnit::Millisecond => self.timestamp_millis(), + TimeUnit::Microsecond => self.timestamp_micros(), + TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(), + } } } diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 34a6bd2465271..11ea22154e97d 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -102,6 +102,11 @@ impl Timestamptz { Self(timestamp_micros) } + /// Creates a `Timestamptz` from microseconds. + pub fn from_nanos(timestamp_nanos: i64) -> Option { + timestamp_nanos.checked_div(1_000).map(Self) + } + /// Returns the number of non-leap-microseconds since January 1, 1970 UTC. pub fn timestamp_micros(&self) -> i64 { self.0 @@ -112,6 +117,11 @@ impl Timestamptz { self.0.div_euclid(1_000) } + /// Returns the number of non-leap-nanosseconds since January 1, 1970 UTC. + pub fn timestamp_nanos(&self) -> Option { + self.0.checked_mul(1_000) + } + /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC (aka "UNIX /// timestamp"). pub fn timestamp(&self) -> i64 { diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index db2ace3d2b6dd..f656555d7c119 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; -use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::bail; use risingwave_common::types::{Datum, ScalarImpl}; @@ -104,32 +104,19 @@ impl ParquetParser { crate::source::SourceColumnType::Normal => { match source_column.is_hidden_addition_col { false => { - let rw_data_type = &source_column.data_type; + let rw_data_type: &risingwave_common::types::DataType = + &source_column.data_type; let rw_column_name = &source_column.name; + if let Some(parquet_column) = record_batch.column_by_name(rw_column_name) { let arrow_field = IcebergArrowConvert .to_arrow_field(rw_column_name, rw_data_type)?; - let converted_arrow_data_type: &arrow_schema_iceberg::DataType = - arrow_field.data_type(); - if converted_arrow_data_type == parquet_column.data_type() { - let array_impl = IcebergArrowConvert - .array_from_arrow_array(&arrow_field, parquet_column)?; - let column = Arc::new(array_impl); - chunk_columns.push(column); - } else { - // data type mismatch, this column is set to null. - let mut array_builder = ArrayBuilderImpl::with_type( - column_size, - rw_data_type.clone(), - ); - - array_builder.append_n_null(record_batch.num_rows()); - let res = array_builder.finish(); - let column = Arc::new(res); - chunk_columns.push(column); - } + let array_impl = IcebergArrowConvert + .array_from_arrow_array(&arrow_field, parquet_column)?; + let column = Arc::new(array_impl); + chunk_columns.push(column); } else { // For columns defined in the source schema but not present in the Parquet file, null values are filled in. let mut array_builder = diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 69308a092e2dd..14258d8924658 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -269,10 +269,12 @@ pub fn extract_valid_column_indices( .iter() .position(|&name| name == column.name) .and_then(|pos| { - let arrow_field = IcebergArrowConvert - .to_arrow_field(&column.name, &column.data_type) + // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. + // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. + let arrow_filed_to_rw_data_type = IcebergArrowConvert + .type_from_field(converted_arrow_schema.field(pos)) .ok()?; - if &arrow_field == converted_arrow_schema.field(pos) { + if arrow_filed_to_rw_data_type == column.data_type { Some(pos) } else { None