-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(source): enhance parquet file source #19221
Changes from all commits
2301fae
c86a8ce
a1d1a9b
d804208
01d102c
7c92a3f
61e79a1
69c8320
857d86d
8d033f7
c83f421
99b6846
4006d13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ | |
|
||
use std::fmt::Write; | ||
|
||
use arrow_53_schema::TimeUnit; | ||
use arrow_array::array; | ||
use arrow_array::cast::AsArray; | ||
use arrow_buffer::OffsetBuffer; | ||
|
@@ -512,6 +513,12 @@ pub trait FromArrow { | |
Time64(Microsecond) => DataType::Time, | ||
Timestamp(Microsecond, None) => DataType::Timestamp, | ||
Timestamp(Microsecond, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Second, None) => DataType::Timestamp, | ||
Timestamp(Second, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Millisecond, None) => DataType::Timestamp, | ||
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Nanosecond, None) => DataType::Timestamp, | ||
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz, | ||
Interval(MonthDayNano) => DataType::Interval, | ||
Utf8 => DataType::Varchar, | ||
Binary => DataType::Bytea, | ||
|
@@ -572,7 +579,6 @@ pub trait FromArrow { | |
if let Some(type_name) = field.metadata().get("ARROW:extension:name") { | ||
return self.from_extension_array(type_name, array); | ||
} | ||
|
||
match array.data_type() { | ||
Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()), | ||
Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()), | ||
|
@@ -584,12 +590,30 @@ pub trait FromArrow { | |
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), | ||
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()), | ||
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()), | ||
Timestamp(Second, None) => { | ||
self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Second, Some(_)) => { | ||
self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Millisecond, None) => { | ||
self.from_timestampms_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Millisecond, Some(_)) => { | ||
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Microsecond, None) => { | ||
self.from_timestampus_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Microsecond, Some(_)) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not directly related to this PR but I wonder why we ignore the timezone ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Introduced in #17201 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, we have encountered three issues regarding the conversion of the timestamp type:
#17201 fix 1, and this pr fix 2 and 3. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand. Correct me if I am wrong, I think 1 is not fixed completely because we don't use the correct timezone for timestamptz based on the arrow data type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Double checked, their is no correctness issue: |
||
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Nanosecond, None) => { | ||
self.from_timestampns_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Nanosecond, Some(_)) => { | ||
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Interval(MonthDayNano) => { | ||
self.from_interval_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
|
@@ -692,6 +716,33 @@ pub trait FromArrow { | |
Ok(ArrayImpl::Time(array.into())) | ||
} | ||
|
||
fn from_timestampsecond_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
fn from_timestampsecond_some_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} | ||
|
||
fn from_timestampms_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampms_some_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} | ||
|
||
fn from_timestampus_array( | ||
&self, | ||
array: &arrow_array::TimestampMicrosecondArray, | ||
|
@@ -706,6 +757,20 @@ pub trait FromArrow { | |
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} | ||
|
||
fn from_timestampns_array( | ||
&self, | ||
array: &arrow_array::TimestampNanosecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampns_some_array( | ||
&self, | ||
array: &arrow_array::TimestampNanosecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} | ||
|
||
fn from_interval_array( | ||
&self, | ||
array: &arrow_array::IntervalMonthDayNanoArray, | ||
|
@@ -842,6 +907,44 @@ macro_rules! converts { | |
} | ||
}; | ||
} | ||
|
||
macro_rules! converts_with_timeunit { | ||
($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { | ||
|
||
impl From<&$ArrayType> for $ArrowType { | ||
fn from(array: &$ArrayType) -> Self { | ||
array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect() | ||
} | ||
} | ||
|
||
impl From<&$ArrowType> for $ArrayType { | ||
fn from(array: &$ArrowType) -> Self { | ||
array.iter().map(|o| { | ||
o.map(|v| { | ||
let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit); | ||
timestamp | ||
}) | ||
}).collect() | ||
} | ||
} | ||
|
||
impl From<&[$ArrowType]> for $ArrayType { | ||
fn from(arrays: &[$ArrowType]) -> Self { | ||
arrays | ||
.iter() | ||
.flat_map(|a| a.iter()) | ||
.map(|o| { | ||
o.map(|v| { | ||
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit) | ||
}) | ||
}) | ||
.collect() | ||
} | ||
} | ||
|
||
}; | ||
} | ||
|
||
converts!(BoolArray, arrow_array::BooleanArray); | ||
converts!(I16Array, arrow_array::Int16Array); | ||
converts!(I32Array, arrow_array::Int32Array); | ||
|
@@ -854,11 +957,19 @@ converts!(Utf8Array, arrow_array::StringArray); | |
converts!(Utf8Array, arrow_array::LargeStringArray); | ||
converts!(DateArray, arrow_array::Date32Array, @map); | ||
converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); | ||
converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map); | ||
converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map); | ||
converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); | ||
converts!(SerialArray, arrow_array::Int64Array, @map); | ||
|
||
converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); | ||
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); | ||
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); | ||
converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); | ||
|
||
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); | ||
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); | ||
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); | ||
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); | ||
|
||
/// Converts RisingWave value from and into Arrow value. | ||
trait FromIntoArrow { | ||
/// The corresponding element type in the Arrow array. | ||
|
@@ -867,6 +978,16 @@ trait FromIntoArrow { | |
fn into_arrow(self) -> Self::ArrowType; | ||
} | ||
|
||
/// Converts RisingWave value from and into Arrow value. | ||
/// Specifically used for converting timestamp types according to timeunit. | ||
trait FromIntoArrowWithUnit { | ||
type ArrowType; | ||
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. | ||
type TimestampType; | ||
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; | ||
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; | ||
} | ||
|
||
impl FromIntoArrow for Serial { | ||
type ArrowType = i64; | ||
|
||
|
@@ -936,34 +1057,55 @@ impl FromIntoArrow for Time { | |
} | ||
} | ||
|
||
impl FromIntoArrow for Timestamp { | ||
impl FromIntoArrowWithUnit for Timestamp { | ||
type ArrowType = i64; | ||
type TimestampType = TimeUnit; | ||
|
||
fn from_arrow(value: Self::ArrowType) -> Self { | ||
Timestamp( | ||
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) | ||
.unwrap() | ||
.naive_utc(), | ||
) | ||
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { | ||
match time_unit { | ||
TimeUnit::Second => { | ||
Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) | ||
} | ||
TimeUnit::Millisecond => { | ||
Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) | ||
} | ||
TimeUnit::Microsecond => { | ||
Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc()) | ||
} | ||
TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()), | ||
} | ||
} | ||
|
||
fn into_arrow(self) -> Self::ArrowType { | ||
self.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_microseconds() | ||
.unwrap() | ||
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { | ||
match time_unit { | ||
TimeUnit::Second => self.0.and_utc().timestamp(), | ||
TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(), | ||
TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(), | ||
TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(), | ||
} | ||
} | ||
} | ||
|
||
impl FromIntoArrow for Timestamptz { | ||
impl FromIntoArrowWithUnit for Timestamptz { | ||
type ArrowType = i64; | ||
|
||
fn from_arrow(value: Self::ArrowType) -> Self { | ||
Timestamptz::from_micros(value) | ||
type TimestampType = TimeUnit; | ||
|
||
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { | ||
match time_unit { | ||
TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), | ||
TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(), | ||
TimeUnit::Microsecond => Timestamptz::from_micros(value), | ||
TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(), | ||
} | ||
} | ||
|
||
fn into_arrow(self) -> Self::ArrowType { | ||
self.timestamp_micros() | ||
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { | ||
match time_unit { | ||
TimeUnit::Second => self.timestamp(), | ||
TimeUnit::Millisecond => self.timestamp_millis(), | ||
TimeUnit::Microsecond => self.timestamp_micros(), | ||
TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(), | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to keep time unit designated as a reminder, there are four types of timeunit that need to be handled. Previously, other types were ignored because of the use of
_
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we use concrete type
Microsecond
instead of_
so it was ignored not because of the use of_
but the contrast. I don't have a strong opinion on this so listing the types explicitly here is also okay. I am more concerned about having separate logics to handle the different time unit separately given that our original implementation won't lose any precision. See my comment below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explained in #19221 (comment)